initialize
This commit is contained in:
134
battery/eg4_lifepower.py
Normal file
134
battery/eg4_lifepower.py
Normal file
@@ -0,0 +1,134 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Standalone EG4 LifePower4 (V1/V2) decoder over USB-RS485.
|
||||
|
||||
Adapted from Louisvdw/dbus-serialbattery (bms/lifepower.py) stripped of its
|
||||
Victron dbus dependencies. Pure pyserial + stdlib.
|
||||
|
||||
Frame format (observed on wire, not standard Modbus):
|
||||
request : 7E <addr> <cmd> 00 <chk> 0D (6 bytes)
|
||||
reply : 7E ... 0D (variable, status payload
|
||||
contains 10 groups of
|
||||
16-bit unsigned ints)
|
||||
|
||||
Verified commands: general status (01), firmware ver (33), hardware ver (42).
|
||||
Address byte is typically 0x01 for a single battery; multipack setups have the
|
||||
master at 0x01 and only the master answers external RS485.
|
||||
|
||||
Usage:
|
||||
python3 eg4_lifepower.py # auto-detect port
|
||||
python3 eg4_lifepower.py /dev/tty.usbserial-XXX # explicit port
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from struct import unpack_from
|
||||
|
||||
import serial # pip install pyserial
|
||||
|
||||
BAUD = 9600
|
||||
|
||||
CMD_GENERAL = bytes([0x7E, 0x01, 0x01, 0x00, 0xFE, 0x0D])
|
||||
CMD_HW_VER = bytes([0x7E, 0x01, 0x42, 0x00, 0xFC, 0x0D])
|
||||
CMD_FW_VER = bytes([0x7E, 0x01, 0x33, 0x00, 0xFE, 0x0D])
|
||||
|
||||
|
||||
@dataclass
|
||||
class Status:
|
||||
cell_voltages: list[float] = field(default_factory=list) # volts
|
||||
current: float = 0.0 # amps (signed, +charge)
|
||||
soc: float = 0.0 # percent
|
||||
capacity_ah: float = 0.0
|
||||
temps_c: list[int] = field(default_factory=list) # up to 6
|
||||
cycles: int = 0
|
||||
pack_voltage: float = 0.0
|
||||
alarms: dict = field(default_factory=dict)
|
||||
|
||||
def summary(self) -> str:
|
||||
cells = self.cell_voltages
|
||||
cmin, cmax = (min(cells), max(cells)) if cells else (0, 0)
|
||||
return (
|
||||
f"pack={self.pack_voltage:6.2f}V I={self.current:+7.2f}A "
|
||||
f"SoC={self.soc:5.1f}% cap={self.capacity_ah:6.2f}Ah "
|
||||
f"cells={len(cells)} ({cmin:.3f}..{cmax:.3f}V Δ={cmax-cmin:.3f}) "
|
||||
f"temps={self.temps_c} cycles={self.cycles} alarms={self.alarms}"
|
||||
)
|
||||
|
||||
|
||||
def send(port: serial.Serial, cmd: bytes, read_n: int = 256) -> bytes:
|
||||
port.reset_input_buffer()
|
||||
port.write(cmd)
|
||||
time.sleep(0.2)
|
||||
return port.read(read_n)
|
||||
|
||||
|
||||
def parse_status(data: bytes) -> Status:
|
||||
"""Parse the general-status reply. Raises ValueError on bad framing."""
|
||||
if not data or data[0] != 0x7E or data[-1] != 0x0D:
|
||||
raise ValueError(f"bad framing: {data.hex(' ')}")
|
||||
|
||||
groups: list[list[int]] = []
|
||||
i = 4 # skip 7E <addr> <cmd> <len> — payload starts at byte 4
|
||||
for _ in range(10):
|
||||
if i + 2 > len(data):
|
||||
raise ValueError("truncated payload")
|
||||
group_len = data[i + 1]
|
||||
end = i + 2 + (group_len * 2)
|
||||
payload = data[i + 2 : end]
|
||||
values = [unpack_from(">H", payload, k)[0] for k in range(0, len(payload), 2)]
|
||||
groups.append(values)
|
||||
i = end
|
||||
|
||||
s = Status()
|
||||
s.cell_voltages = [(v & 0x7FFF) / 1000 for v in groups[0]]
|
||||
s.current = (30000 - groups[1][0]) / 100
|
||||
s.soc = groups[2][0] / 100
|
||||
s.capacity_ah = groups[3][0] / 100
|
||||
s.temps_c = [(t & 0xFF) - 50 for t in groups[4][:6]]
|
||||
flags = groups[5][1] if len(groups[5]) > 1 else 0
|
||||
s.alarms = {
|
||||
"current_over": bool(flags & 0b00001000),
|
||||
"voltage_high": bool(flags & 0b00010000),
|
||||
"voltage_low": bool(flags & 0b00100000),
|
||||
"temp_high_chg": bool(flags & 0b01000000),
|
||||
"temp_low_chg": bool(flags & 0b10000000),
|
||||
}
|
||||
s.cycles = groups[6][0]
|
||||
s.pack_voltage = groups[7][0] / 100
|
||||
return s
|
||||
|
||||
|
||||
def decode_ascii(data: bytes) -> str:
|
||||
return data.decode("ascii", errors="ignore").strip()
|
||||
|
||||
|
||||
def autodetect() -> str | None:
|
||||
for pat in ("/dev/tty.usbserial*", "/dev/tty.usbmodem*", "/dev/ttyUSB*", "/dev/ttyACM*"):
|
||||
hits = glob.glob(pat)
|
||||
if hits:
|
||||
return hits[0]
|
||||
return None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
port_path = sys.argv[1] if len(sys.argv) > 1 else autodetect()
|
||||
if not port_path:
|
||||
sys.exit("no serial port found; pass one explicitly")
|
||||
print(f"opening {port_path} @ {BAUD} 8N1")
|
||||
with serial.Serial(port_path, BAUD, bytesize=8, parity="N", stopbits=1, timeout=1.5) as p:
|
||||
hw = send(p, CMD_HW_VER)
|
||||
fw = send(p, CMD_FW_VER)
|
||||
if hw:
|
||||
print(f"hw: {decode_ascii(hw)}")
|
||||
if fw:
|
||||
print(f"fw: {decode_ascii(fw)}")
|
||||
raw = send(p, CMD_GENERAL)
|
||||
print(f"raw ({len(raw)}B): {raw.hex(' ')}")
|
||||
if raw:
|
||||
print(parse_status(raw).summary())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user