135 lines
4.6 KiB
Python
135 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Standalone EG4 LifePower4 (V1/V2) decoder over USB-RS485.
|
|
|
|
Adapted from Louisvdw/dbus-serialbattery (bms/lifepower.py) stripped of its
|
|
Victron dbus dependencies. Pure pyserial + stdlib.
|
|
|
|
Frame format (observed on wire, not standard Modbus):
|
|
request : 7E <addr> <cmd> 00 <chk> 0D (6 bytes)
|
|
reply : 7E ... 0D (variable, status payload
|
|
contains 10 groups of
|
|
16-bit unsigned ints)
|
|
|
|
Verified commands: general status (01), firmware ver (33), hardware ver (42).
|
|
Address byte is typically 0x01 for a single battery; multipack setups have the
|
|
master at 0x01 and only the master answers external RS485.
|
|
|
|
Usage:
|
|
python3 eg4_lifepower.py # auto-detect port
|
|
python3 eg4_lifepower.py /dev/tty.usbserial-XXX # explicit port
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from struct import unpack_from
|
|
|
|
import serial # pip install pyserial
|
|
|
|
BAUD = 9600
|
|
|
|
CMD_GENERAL = bytes([0x7E, 0x01, 0x01, 0x00, 0xFE, 0x0D])
|
|
CMD_HW_VER = bytes([0x7E, 0x01, 0x42, 0x00, 0xFC, 0x0D])
|
|
CMD_FW_VER = bytes([0x7E, 0x01, 0x33, 0x00, 0xFE, 0x0D])
|
|
|
|
|
|
@dataclass
|
|
class Status:
|
|
cell_voltages: list[float] = field(default_factory=list) # volts
|
|
current: float = 0.0 # amps (signed, +charge)
|
|
soc: float = 0.0 # percent
|
|
capacity_ah: float = 0.0
|
|
temps_c: list[int] = field(default_factory=list) # up to 6
|
|
cycles: int = 0
|
|
pack_voltage: float = 0.0
|
|
alarms: dict = field(default_factory=dict)
|
|
|
|
def summary(self) -> str:
|
|
cells = self.cell_voltages
|
|
cmin, cmax = (min(cells), max(cells)) if cells else (0, 0)
|
|
return (
|
|
f"pack={self.pack_voltage:6.2f}V I={self.current:+7.2f}A "
|
|
f"SoC={self.soc:5.1f}% cap={self.capacity_ah:6.2f}Ah "
|
|
f"cells={len(cells)} ({cmin:.3f}..{cmax:.3f}V Δ={cmax-cmin:.3f}) "
|
|
f"temps={self.temps_c} cycles={self.cycles} alarms={self.alarms}"
|
|
)
|
|
|
|
|
|
def send(port: serial.Serial, cmd: bytes, read_n: int = 256) -> bytes:
|
|
port.reset_input_buffer()
|
|
port.write(cmd)
|
|
time.sleep(0.2)
|
|
return port.read(read_n)
|
|
|
|
|
|
def parse_status(data: bytes) -> Status:
|
|
"""Parse the general-status reply. Raises ValueError on bad framing."""
|
|
if not data or data[0] != 0x7E or data[-1] != 0x0D:
|
|
raise ValueError(f"bad framing: {data.hex(' ')}")
|
|
|
|
groups: list[list[int]] = []
|
|
i = 4 # skip 7E <addr> <cmd> <len> — payload starts at byte 4
|
|
for _ in range(10):
|
|
if i + 2 > len(data):
|
|
raise ValueError("truncated payload")
|
|
group_len = data[i + 1]
|
|
end = i + 2 + (group_len * 2)
|
|
payload = data[i + 2 : end]
|
|
values = [unpack_from(">H", payload, k)[0] for k in range(0, len(payload), 2)]
|
|
groups.append(values)
|
|
i = end
|
|
|
|
s = Status()
|
|
s.cell_voltages = [(v & 0x7FFF) / 1000 for v in groups[0]]
|
|
s.current = (30000 - groups[1][0]) / 100
|
|
s.soc = groups[2][0] / 100
|
|
s.capacity_ah = groups[3][0] / 100
|
|
s.temps_c = [(t & 0xFF) - 50 for t in groups[4][:6]]
|
|
flags = groups[5][1] if len(groups[5]) > 1 else 0
|
|
s.alarms = {
|
|
"current_over": bool(flags & 0b00001000),
|
|
"voltage_high": bool(flags & 0b00010000),
|
|
"voltage_low": bool(flags & 0b00100000),
|
|
"temp_high_chg": bool(flags & 0b01000000),
|
|
"temp_low_chg": bool(flags & 0b10000000),
|
|
}
|
|
s.cycles = groups[6][0]
|
|
s.pack_voltage = groups[7][0] / 100
|
|
return s
|
|
|
|
|
|
def decode_ascii(data: bytes) -> str:
|
|
return data.decode("ascii", errors="ignore").strip()
|
|
|
|
|
|
def autodetect() -> str | None:
|
|
for pat in ("/dev/tty.usbserial*", "/dev/tty.usbmodem*", "/dev/ttyUSB*", "/dev/ttyACM*"):
|
|
hits = glob.glob(pat)
|
|
if hits:
|
|
return hits[0]
|
|
return None
|
|
|
|
|
|
def main() -> None:
|
|
port_path = sys.argv[1] if len(sys.argv) > 1 else autodetect()
|
|
if not port_path:
|
|
sys.exit("no serial port found; pass one explicitly")
|
|
print(f"opening {port_path} @ {BAUD} 8N1")
|
|
with serial.Serial(port_path, BAUD, bytesize=8, parity="N", stopbits=1, timeout=1.5) as p:
|
|
hw = send(p, CMD_HW_VER)
|
|
fw = send(p, CMD_FW_VER)
|
|
if hw:
|
|
print(f"hw: {decode_ascii(hw)}")
|
|
if fw:
|
|
print(f"fw: {decode_ascii(fw)}")
|
|
raw = send(p, CMD_GENERAL)
|
|
print(f"raw ({len(raw)}B): {raw.hex(' ')}")
|
|
if raw:
|
|
print(parse_status(raw).summary())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|