#!/usr/bin/env python3 """Standalone EG4 LifePower4 (V1/V2) decoder over USB-RS485. Adapted from Louisvdw/dbus-serialbattery (bms/lifepower.py) stripped of its Victron dbus dependencies. Pure pyserial + stdlib. Frame format (observed on wire, not standard Modbus): request : 7E 00 0D (6 bytes) reply : 7E ... 0D (variable, status payload contains 10 groups of 16-bit unsigned ints) Verified commands: general status (01), firmware ver (33), hardware ver (42). Address byte is typically 0x01 for a single battery; multipack setups have the master at 0x01 and only the master answers external RS485. Usage: python3 eg4_lifepower.py # auto-detect port python3 eg4_lifepower.py /dev/tty.usbserial-XXX # explicit port """ from __future__ import annotations import glob import sys import time from dataclasses import dataclass, field from struct import unpack_from import serial # pip install pyserial BAUD = 9600 CMD_GENERAL = bytes([0x7E, 0x01, 0x01, 0x00, 0xFE, 0x0D]) CMD_HW_VER = bytes([0x7E, 0x01, 0x42, 0x00, 0xFC, 0x0D]) CMD_FW_VER = bytes([0x7E, 0x01, 0x33, 0x00, 0xFE, 0x0D]) @dataclass class Status: cell_voltages: list[float] = field(default_factory=list) # volts current: float = 0.0 # amps (signed, +charge) soc: float = 0.0 # percent capacity_ah: float = 0.0 temps_c: list[int] = field(default_factory=list) # up to 6 cycles: int = 0 pack_voltage: float = 0.0 alarms: dict = field(default_factory=dict) def summary(self) -> str: cells = self.cell_voltages cmin, cmax = (min(cells), max(cells)) if cells else (0, 0) return ( f"pack={self.pack_voltage:6.2f}V I={self.current:+7.2f}A " f"SoC={self.soc:5.1f}% cap={self.capacity_ah:6.2f}Ah " f"cells={len(cells)} ({cmin:.3f}..{cmax:.3f}V Δ={cmax-cmin:.3f}) " f"temps={self.temps_c} cycles={self.cycles} alarms={self.alarms}" ) def send(port: serial.Serial, cmd: bytes, read_n: int = 256) -> bytes: port.reset_input_buffer() port.write(cmd) time.sleep(0.2) return port.read(read_n) def parse_status(data: bytes) -> Status: """Parse the general-status reply. Raises ValueError on bad framing.""" if not data or data[0] != 0x7E or data[-1] != 0x0D: raise ValueError(f"bad framing: {data.hex(' ')}") groups: list[list[int]] = [] i = 4 # skip 7E — payload starts at byte 4 for _ in range(10): if i + 2 > len(data): raise ValueError("truncated payload") group_len = data[i + 1] end = i + 2 + (group_len * 2) payload = data[i + 2 : end] values = [unpack_from(">H", payload, k)[0] for k in range(0, len(payload), 2)] groups.append(values) i = end s = Status() s.cell_voltages = [(v & 0x7FFF) / 1000 for v in groups[0]] s.current = (30000 - groups[1][0]) / 100 s.soc = groups[2][0] / 100 s.capacity_ah = groups[3][0] / 100 s.temps_c = [(t & 0xFF) - 50 for t in groups[4][:6]] flags = groups[5][1] if len(groups[5]) > 1 else 0 s.alarms = { "current_over": bool(flags & 0b00001000), "voltage_high": bool(flags & 0b00010000), "voltage_low": bool(flags & 0b00100000), "temp_high_chg": bool(flags & 0b01000000), "temp_low_chg": bool(flags & 0b10000000), } s.cycles = groups[6][0] s.pack_voltage = groups[7][0] / 100 return s def decode_ascii(data: bytes) -> str: return data.decode("ascii", errors="ignore").strip() def autodetect() -> str | None: for pat in ("/dev/tty.usbserial*", "/dev/tty.usbmodem*", "/dev/ttyUSB*", "/dev/ttyACM*"): hits = glob.glob(pat) if hits: return hits[0] return None def main() -> None: port_path = sys.argv[1] if len(sys.argv) > 1 else autodetect() if not port_path: sys.exit("no serial port found; pass one explicitly") print(f"opening {port_path} @ {BAUD} 8N1") with serial.Serial(port_path, BAUD, bytesize=8, parity="N", stopbits=1, timeout=1.5) as p: hw = send(p, CMD_HW_VER) fw = send(p, CMD_FW_VER) if hw: print(f"hw: {decode_ascii(hw)}") if fw: print(f"fw: {decode_ascii(fw)}") raw = send(p, CMD_GENERAL) print(f"raw ({len(raw)}B): {raw.hex(' ')}") if raw: print(parse_status(raw).summary()) if __name__ == "__main__": main()