Files

209 lines
7.9 KiB
Python
Raw Permalink Normal View History

2026-04-24 16:34:10 -04:00
#!/usr/bin/env python3
"""Address sweep + protocol probe for EG4 LifePower4 over USB-RS485.
Tries three candidate protocols across a range of unit addresses:
1. Legacy 7E/0D binary framing addresses 0x00..0x10
2. PACE BMS V25 (ASCII-hex inside 7E/0D) addresses 0x01..0x10
Used by Pylontech, Solark, Deye, Lux, Growatt, MegaRev, etc.
3. Standard Modbus RTU (fn 0x03) addresses 1..16, reads 39 holding regs
Any non-empty reply is printed raw. First byte tells you which protocol:
- 7E + ASCII-hex payload = PACE V25
- 7E + binary payload = legacy
- matching addr byte + 0x03 = Modbus
Usage:
python3 sweep.py # auto-detect port, 9600
python3 sweep.py /dev/ttyUSB0 # explicit port
python3 sweep.py /dev/ttyUSB0 19200 # explicit port + baud
"""
import glob
import sys
import time
import serial # pip install pyserial
DEFAULT_BAUD = 9600
def autodetect() -> str | None:
for pat in ("/dev/tty.usbserial*", "/dev/tty.usbmodem*", "/dev/ttyUSB*", "/dev/ttyACM*"):
hits = glob.glob(pat)
if hits:
return hits[0]
return None
def frame_7e(addr: int, cmd: int = 0x01, length: int = 0x00) -> bytes:
chk = (0x100 - (addr + cmd + length)) & 0xFF
return bytes([0x7E, addr, cmd, length, chk, 0x0D])
def crc16_modbus(data: bytes) -> int:
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1
return crc
def frame_modbus(addr: int, func: int = 0x03, start: int = 0x0000, count: int = 39) -> bytes:
body = bytes([addr, func, start >> 8, start & 0xFF, count >> 8, count & 0xFF])
crc = crc16_modbus(body)
return body + bytes([crc & 0xFF, crc >> 8]) # Modbus CRC is LSB-first on wire
def _pace_lenid(n: int) -> int:
"""V25 LENID = (lchecksum << 12) | (n & 0x0FFF); lchecksum per PACE spec."""
s = ((n >> 8) & 0x0F) + ((n >> 4) & 0x0F) + (n & 0x0F)
lchk = ((~s) + 1) & 0x0F
return (lchk << 12) | (n & 0x0FFF)
def frame_pace(addr: int, ver: int = 0x20, cid1: int = 0x4A, cid2: int = 0x42, info: bytes = b"") -> bytes:
"""Build a PACE BMS request frame (ASCII-hex inside ~…\\r).
Defaults reproduce the EG4 LifePower4 V20 read-analog frame verified against
nkinnan/esphome-pace-bms: ver=0x20 (V20), cid1=0x4A (EG4/Narada family),
cid2=0x42 (read analog), empty INFO payload (EG4 variant busId is *not*
repeated in the payload, unlike generic PACE). Known-good at addr 1:
~20014A420000FDA2\\r
"""
info_ascii = info.hex().upper().encode()
lenid = _pace_lenid(len(info_ascii))
body = f"{ver:02X}{addr:02X}{cid1:02X}{cid2:02X}{lenid:04X}".encode() + info_ascii
chksum = ((-sum(body)) & 0xFFFF)
return b"~" + body + f"{chksum:04X}".encode() + b"\r"
def exchange(port: serial.Serial, tx: bytes, wait: float = 0.25, read_n: int = 256) -> bytes:
port.reset_input_buffer()
port.write(tx)
time.sleep(wait)
return port.read(read_n)
def classify_7e(rx: bytes) -> str:
if not rx:
return ""
if rx[0] == 0x7E and rx.endswith(b"\x0D"):
return " <- 7E framed reply"
return " <- unexpected bytes"
def classify_pace(rx: bytes) -> str:
if not rx:
return ""
if rx[0] == 0x7E and rx.endswith(b"\r"):
# V25 payload is ASCII-hex between ~ and \r
mid = rx[1:-1]
if mid.isascii() and all(c in b"0123456789ABCDEFabcdef" for c in mid):
return " <- PACE V25 reply"
return " <- 7E-framed but non-ASCII (likely legacy, not V25)"
return " <- unexpected bytes"
def classify_modbus(rx: bytes, addr: int) -> str:
if not rx:
return ""
if len(rx) >= 3 and rx[0] == addr and rx[1] == 0x03:
return " <- Modbus reply"
if len(rx) >= 3 and rx[0] == addr and rx[1] == 0x83:
return f" <- Modbus exception (code {rx[2]:#04x})"
return " <- unexpected bytes"
COMMON_BAUDS = (9600, 19200, 38400, 57600, 115200)
def passive_listen(port_path: str, seconds: int = 15) -> None:
"""Open at each baud for N seconds and dump whatever the pack transmits
unsolicited. If the pack is in broadcast mode (Pylontech CAN-style,
Victron, some Growatt modes), we'll see frames arrive without asking.
"""
print(f"passive listen on {port_path}{seconds}s per baud")
for baud in COMMON_BAUDS:
try:
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=0.25) as p:
p.reset_input_buffer()
deadline = time.monotonic() + seconds
buf = bytearray()
while time.monotonic() < deadline:
chunk = p.read(256)
if chunk:
buf.extend(chunk)
ascii_preview = buf.decode("ascii", errors="replace") if buf else ""
tag = " <- traffic!" if buf else ""
print(f" @ {baud:6d} 8N1: {len(buf):4d}B{tag}")
if buf:
print(f" hex: {buf.hex(' ')}")
print(f" ascii: {ascii_preview!r}")
except serial.SerialException as e:
print(f" @ {baud}: {e}")
def quick_baud_scan(port_path: str) -> None:
"""At addr 0x01, send one legacy + one V25 + one Modbus frame at each baud.
Any non-empty reply means we've found the right baud + a protocol that the
pack is willing to answer then rerun the full sweep at that baud.
"""
print(f"opening {port_path} — scanning baud rates {COMMON_BAUDS}")
for baud in COMMON_BAUDS:
try:
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=2.0) as p:
print(f"\n @ {baud} 8N1:")
for label, tx in (
("legacy", frame_7e(0x01)),
("pace", frame_pace(0x01)),
("modbus", frame_modbus(0x01)),
):
rx = exchange(p, tx, wait=0.3)
tag = ""
if rx:
tag = " <- REPLY!"
print(f" {label:7s}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{tag}")
except serial.SerialException as e:
print(f" @ {baud}: {e}")
def sweep(port_path: str, baud: int) -> None:
print(f"opening {port_path} @ {baud} 8N1 (2s timeout)")
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=2.0) as p:
print("\n[7E protocol — general-status sweep]")
for addr in range(0x00, 0x11):
tx = frame_7e(addr)
rx = exchange(p, tx)
print(f" addr 0x{addr:02X}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_7e(rx)}")
print("\n[PACE V20 (EG4/Narada variant) — read-analog-data sweep]")
for addr in list(range(0x01, 0x11)) + [0xFF]: # include broadcast 0xFF
tx = frame_pace(addr)
rx = exchange(p, tx, wait=0.35)
print(f" addr 0x{addr:02X}: tx {tx.decode('ascii', errors='replace').strip()!r} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_pace(rx)}")
print("\n[Modbus RTU — read 39 holding regs sweep]")
for addr in range(1, 17):
tx = frame_modbus(addr)
rx = exchange(p, tx)
print(f" addr 0x{addr:02X}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_modbus(rx, addr)}")
if __name__ == "__main__":
args = sys.argv[1:]
scan_only = "--scan" in args
listen_only = "--listen" in args
args = [a for a in args if a not in ("--scan", "--listen")]
port = args[0] if args else autodetect()
if not port:
sys.exit("no serial port found; pass one explicitly")
if listen_only:
passive_listen(port)
sys.exit(0)
if scan_only or len(args) < 2:
quick_baud_scan(port)
if scan_only:
sys.exit(0)
baud = int(args[1]) if len(args) > 1 else DEFAULT_BAUD
sweep(port, baud)