209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Address sweep + protocol probe for EG4 LifePower4 over USB-RS485.
|
|
|
|
Tries three candidate protocols across a range of unit addresses:
|
|
1. Legacy 7E/0D binary framing — addresses 0x00..0x10
|
|
2. PACE BMS V25 (ASCII-hex inside 7E/0D) — addresses 0x01..0x10
|
|
Used by Pylontech, Solark, Deye, Lux, Growatt, MegaRev, etc.
|
|
3. Standard Modbus RTU (fn 0x03) — addresses 1..16, reads 39 holding regs
|
|
|
|
Any non-empty reply is printed raw. First byte tells you which protocol:
|
|
- 7E + ASCII-hex payload = PACE V25
|
|
- 7E + binary payload = legacy
|
|
- matching addr byte + 0x03 = Modbus
|
|
|
|
Usage:
|
|
python3 sweep.py # auto-detect port, 9600
|
|
python3 sweep.py /dev/ttyUSB0 # explicit port
|
|
python3 sweep.py /dev/ttyUSB0 19200 # explicit port + baud
|
|
"""
|
|
import glob
|
|
import sys
|
|
import time
|
|
|
|
import serial # pip install pyserial
|
|
|
|
DEFAULT_BAUD = 9600
|
|
|
|
|
|
def autodetect() -> str | None:
|
|
for pat in ("/dev/tty.usbserial*", "/dev/tty.usbmodem*", "/dev/ttyUSB*", "/dev/ttyACM*"):
|
|
hits = glob.glob(pat)
|
|
if hits:
|
|
return hits[0]
|
|
return None
|
|
|
|
|
|
def frame_7e(addr: int, cmd: int = 0x01, length: int = 0x00) -> bytes:
|
|
chk = (0x100 - (addr + cmd + length)) & 0xFF
|
|
return bytes([0x7E, addr, cmd, length, chk, 0x0D])
|
|
|
|
|
|
def crc16_modbus(data: bytes) -> int:
|
|
crc = 0xFFFF
|
|
for b in data:
|
|
crc ^= b
|
|
for _ in range(8):
|
|
crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1
|
|
return crc
|
|
|
|
|
|
def frame_modbus(addr: int, func: int = 0x03, start: int = 0x0000, count: int = 39) -> bytes:
|
|
body = bytes([addr, func, start >> 8, start & 0xFF, count >> 8, count & 0xFF])
|
|
crc = crc16_modbus(body)
|
|
return body + bytes([crc & 0xFF, crc >> 8]) # Modbus CRC is LSB-first on wire
|
|
|
|
|
|
def _pace_lenid(n: int) -> int:
|
|
"""V25 LENID = (lchecksum << 12) | (n & 0x0FFF); lchecksum per PACE spec."""
|
|
s = ((n >> 8) & 0x0F) + ((n >> 4) & 0x0F) + (n & 0x0F)
|
|
lchk = ((~s) + 1) & 0x0F
|
|
return (lchk << 12) | (n & 0x0FFF)
|
|
|
|
|
|
def frame_pace(addr: int, ver: int = 0x20, cid1: int = 0x4A, cid2: int = 0x42, info: bytes = b"") -> bytes:
|
|
"""Build a PACE BMS request frame (ASCII-hex inside ~…\\r).
|
|
|
|
Defaults reproduce the EG4 LifePower4 V20 read-analog frame verified against
|
|
nkinnan/esphome-pace-bms: ver=0x20 (V20), cid1=0x4A (EG4/Narada family),
|
|
cid2=0x42 (read analog), empty INFO payload (EG4 variant — busId is *not*
|
|
repeated in the payload, unlike generic PACE). Known-good at addr 1:
|
|
~20014A420000FDA2\\r
|
|
"""
|
|
info_ascii = info.hex().upper().encode()
|
|
lenid = _pace_lenid(len(info_ascii))
|
|
body = f"{ver:02X}{addr:02X}{cid1:02X}{cid2:02X}{lenid:04X}".encode() + info_ascii
|
|
chksum = ((-sum(body)) & 0xFFFF)
|
|
return b"~" + body + f"{chksum:04X}".encode() + b"\r"
|
|
|
|
|
|
def exchange(port: serial.Serial, tx: bytes, wait: float = 0.25, read_n: int = 256) -> bytes:
|
|
port.reset_input_buffer()
|
|
port.write(tx)
|
|
time.sleep(wait)
|
|
return port.read(read_n)
|
|
|
|
|
|
def classify_7e(rx: bytes) -> str:
|
|
if not rx:
|
|
return ""
|
|
if rx[0] == 0x7E and rx.endswith(b"\x0D"):
|
|
return " <- 7E framed reply"
|
|
return " <- unexpected bytes"
|
|
|
|
|
|
def classify_pace(rx: bytes) -> str:
|
|
if not rx:
|
|
return ""
|
|
if rx[0] == 0x7E and rx.endswith(b"\r"):
|
|
# V25 payload is ASCII-hex between ~ and \r
|
|
mid = rx[1:-1]
|
|
if mid.isascii() and all(c in b"0123456789ABCDEFabcdef" for c in mid):
|
|
return " <- PACE V25 reply"
|
|
return " <- 7E-framed but non-ASCII (likely legacy, not V25)"
|
|
return " <- unexpected bytes"
|
|
|
|
|
|
def classify_modbus(rx: bytes, addr: int) -> str:
|
|
if not rx:
|
|
return ""
|
|
if len(rx) >= 3 and rx[0] == addr and rx[1] == 0x03:
|
|
return " <- Modbus reply"
|
|
if len(rx) >= 3 and rx[0] == addr and rx[1] == 0x83:
|
|
return f" <- Modbus exception (code {rx[2]:#04x})"
|
|
return " <- unexpected bytes"
|
|
|
|
|
|
COMMON_BAUDS = (9600, 19200, 38400, 57600, 115200)
|
|
|
|
|
|
def passive_listen(port_path: str, seconds: int = 15) -> None:
|
|
"""Open at each baud for N seconds and dump whatever the pack transmits
|
|
unsolicited. If the pack is in broadcast mode (Pylontech CAN-style,
|
|
Victron, some Growatt modes), we'll see frames arrive without asking.
|
|
"""
|
|
print(f"passive listen on {port_path} — {seconds}s per baud")
|
|
for baud in COMMON_BAUDS:
|
|
try:
|
|
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=0.25) as p:
|
|
p.reset_input_buffer()
|
|
deadline = time.monotonic() + seconds
|
|
buf = bytearray()
|
|
while time.monotonic() < deadline:
|
|
chunk = p.read(256)
|
|
if chunk:
|
|
buf.extend(chunk)
|
|
ascii_preview = buf.decode("ascii", errors="replace") if buf else ""
|
|
tag = " <- traffic!" if buf else ""
|
|
print(f" @ {baud:6d} 8N1: {len(buf):4d}B{tag}")
|
|
if buf:
|
|
print(f" hex: {buf.hex(' ')}")
|
|
print(f" ascii: {ascii_preview!r}")
|
|
except serial.SerialException as e:
|
|
print(f" @ {baud}: {e}")
|
|
|
|
|
|
def quick_baud_scan(port_path: str) -> None:
|
|
"""At addr 0x01, send one legacy + one V25 + one Modbus frame at each baud.
|
|
Any non-empty reply means we've found the right baud + a protocol that the
|
|
pack is willing to answer — then rerun the full sweep at that baud.
|
|
"""
|
|
print(f"opening {port_path} — scanning baud rates {COMMON_BAUDS}")
|
|
for baud in COMMON_BAUDS:
|
|
try:
|
|
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=2.0) as p:
|
|
print(f"\n @ {baud} 8N1:")
|
|
for label, tx in (
|
|
("legacy", frame_7e(0x01)),
|
|
("pace", frame_pace(0x01)),
|
|
("modbus", frame_modbus(0x01)),
|
|
):
|
|
rx = exchange(p, tx, wait=0.3)
|
|
tag = ""
|
|
if rx:
|
|
tag = " <- REPLY!"
|
|
print(f" {label:7s}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{tag}")
|
|
except serial.SerialException as e:
|
|
print(f" @ {baud}: {e}")
|
|
|
|
|
|
def sweep(port_path: str, baud: int) -> None:
|
|
print(f"opening {port_path} @ {baud} 8N1 (2s timeout)")
|
|
with serial.Serial(port_path, baud, bytesize=8, parity="N", stopbits=1, timeout=2.0) as p:
|
|
print("\n[7E protocol — general-status sweep]")
|
|
for addr in range(0x00, 0x11):
|
|
tx = frame_7e(addr)
|
|
rx = exchange(p, tx)
|
|
print(f" addr 0x{addr:02X}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_7e(rx)}")
|
|
|
|
print("\n[PACE V20 (EG4/Narada variant) — read-analog-data sweep]")
|
|
for addr in list(range(0x01, 0x11)) + [0xFF]: # include broadcast 0xFF
|
|
tx = frame_pace(addr)
|
|
rx = exchange(p, tx, wait=0.35)
|
|
print(f" addr 0x{addr:02X}: tx {tx.decode('ascii', errors='replace').strip()!r} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_pace(rx)}")
|
|
|
|
print("\n[Modbus RTU — read 39 holding regs sweep]")
|
|
for addr in range(1, 17):
|
|
tx = frame_modbus(addr)
|
|
rx = exchange(p, tx)
|
|
print(f" addr 0x{addr:02X}: tx {tx.hex(' ')} rx ({len(rx):3d}B) {rx.hex(' ')}{classify_modbus(rx, addr)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
scan_only = "--scan" in args
|
|
listen_only = "--listen" in args
|
|
args = [a for a in args if a not in ("--scan", "--listen")]
|
|
port = args[0] if args else autodetect()
|
|
if not port:
|
|
sys.exit("no serial port found; pass one explicitly")
|
|
if listen_only:
|
|
passive_listen(port)
|
|
sys.exit(0)
|
|
if scan_only or len(args) < 2:
|
|
quick_baud_scan(port)
|
|
if scan_only:
|
|
sys.exit(0)
|
|
baud = int(args[1]) if len(args) > 1 else DEFAULT_BAUD
|
|
sweep(port, baud)
|