1049 lines
42 KiB
Plaintext
Executable File
1049 lines
42 KiB
Plaintext
Executable File
#!/usr/bin/env -S uv run --script
|
||
# /// script
|
||
# requires-python = ">=3.11"
|
||
# dependencies = [
|
||
# "pyserial>=3.5",
|
||
# "paho-mqtt>=2.0",
|
||
# "pyyaml>=6.0",
|
||
# ]
|
||
# ///
|
||
"""
|
||
eg4-battery — telemetry bridge from EG4 LifePower4 v2 BMSes to MQTT/HA.
|
||
|
||
Three modes, selected via `bus.mode` in the config:
|
||
|
||
modbus_per_pack — RECOMMENDED. One FTDI RS-485 adapter per pack. Each pack
|
||
has its own (port, address, baud) in the `packs:` list.
|
||
Uses Modbus RTU fn=0x03 read-47-regs at 0x0000. Decoder
|
||
extracts named fields (pack V, 16 cell voltages, temps,
|
||
SoC, SoH, Capacity, warnings, protections) — register
|
||
map reverse-engineered from the EG4 `lv_host.app` BMS
|
||
Tool's SQLite schema + UI labels.
|
||
|
||
active — LEGACY. Single FTDI adapter on a dedicated bus, EG4
|
||
7E/0D protocol at 9600 baud. Was ported from the V1
|
||
firmware via `battery/eg4_lifepower.py`; V2 hardware
|
||
doesn't speak this protocol in practice. Kept for
|
||
reference / possible V1 deployments.
|
||
|
||
passive — LEGACY. Listen-only Modbus-RTU sniffer at 19200 baud.
|
||
Originally targeted the LVX6048 BMS bus; LVX6048 doesn't
|
||
poll EG4 packs that way, so the mode is diagnostic only.
|
||
|
||
Usage:
|
||
eg4-battery -C <config.yaml>
|
||
eg4-battery -C <config.yaml> --dry-run # mock bus, print, exit
|
||
eg4-battery -C <config.yaml> --trace # log every frame
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import dataclasses
|
||
import json
|
||
import logging
|
||
import random
|
||
import struct
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from struct import unpack_from
|
||
from typing import Any, Iterator
|
||
|
||
import paho.mqtt.client as mqtt
|
||
import serial
|
||
import yaml
|
||
|
||
log = logging.getLogger("eg4-battery")
|
||
|
||
|
||
# =============================================================================
|
||
# === config ==================================================================
|
||
# =============================================================================
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class PackConfig:
|
||
name: str # HA entity prefix / device identifier (e.g. "lifepower4_1")
|
||
address: int # protocol-level address (Modbus slave ID, or EG4 7E address)
|
||
port: str | None = None # per-pack port (modbus_per_pack mode only)
|
||
baud: int | None = None # per-pack baud override (modbus_per_pack mode only)
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class BusConfig:
|
||
mode: str # "modbus_per_pack" | "active" | "passive"
|
||
transport: str = "serial" # "serial" | "mock"
|
||
port: str = "" # shared port (active / passive modes)
|
||
baud: int = 9600
|
||
read_chunk: int = 512
|
||
timeout_s: float = 1.5 # per-query timeout
|
||
poll_interval_s: float = 10.0 # full round-robin cycle target
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class MQTTConfig:
|
||
host: str
|
||
port: int
|
||
username: str
|
||
password: str
|
||
discovery_prefix: str = "homeassistant"
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class AppConfig:
|
||
bus: BusConfig
|
||
mqtt: MQTTConfig
|
||
packs: list[PackConfig]
|
||
cell_count: int = 16 # active mode only
|
||
|
||
|
||
def load_config(path: Path) -> AppConfig:
|
||
raw = yaml.safe_load(path.read_text())
|
||
|
||
# Allow env-var overrides for MQTT credentials. Useful for Docker
|
||
# deployments where secrets shouldn't live in the YAML, and for HA
|
||
# addons translating addon-options into env at runtime.
|
||
import os
|
||
mqtt_raw = dict(raw["mqtt"])
|
||
for key, env_var in (
|
||
("host", "MQTT_HOST"),
|
||
("port", "MQTT_PORT"),
|
||
("username", "MQTT_USERNAME"),
|
||
("password", "MQTT_PASSWORD"),
|
||
):
|
||
v = os.environ.get(env_var)
|
||
if v is not None:
|
||
mqtt_raw[key] = int(v) if key == "port" else v
|
||
|
||
return AppConfig(
|
||
bus=BusConfig(**raw["bus"]),
|
||
mqtt=MQTTConfig(**mqtt_raw),
|
||
packs=[PackConfig(**p) for p in raw["packs"]],
|
||
cell_count=raw.get("cell_count", 16),
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# === active mode: EG4 7E/0D protocol =========================================
|
||
# =============================================================================
|
||
# Verified against `battery/eg4_lifepower.py`. Frame:
|
||
# request (6 bytes): 7E <addr> <cmd> 00 <chk> 0D
|
||
# chk = (0x100 - (addr + cmd + len)) & 0xFF
|
||
# reply (variable): 7E <addr> <cmd> <len> [10 groups] <chk> 0D
|
||
# each group: <type_byte> <count> <count × big-endian uint16>
|
||
|
||
|
||
CMD_GENERAL_STATUS = 0x01 # cells, V, I, SoC, cap, temps, cycles, alarms
|
||
CMD_FW_VER = 0x33
|
||
CMD_HW_VER = 0x42
|
||
|
||
|
||
def encode_eg4_request(address: int, cmd: int, length: int = 0) -> bytes:
|
||
chk = (0x100 - (address + cmd + length)) & 0xFF
|
||
return bytes([0x7E, address, cmd, length, chk, 0x0D])
|
||
|
||
|
||
def decode_eg4_general_status(data: bytes, cell_count: int) -> dict[str, Any]:
|
||
"""Decode a fn=0x01 reply into a flat dict keyed for HA. Mirrors
|
||
`battery/eg4_lifepower.py::parse_status`. Permissive framing check
|
||
(header/footer); upstream doesn't validate the reply CRC and neither
|
||
do we until we know the algorithm."""
|
||
if not data or len(data) < 6 or data[0] != 0x7E or data[-1] != 0x0D:
|
||
raise ValueError(f"bad framing: {data.hex(' ')[:120]}")
|
||
|
||
groups: list[list[int]] = []
|
||
i = 4 # skip 7E <addr> <cmd> <len>
|
||
for _ in range(10):
|
||
if i + 2 > len(data):
|
||
raise ValueError(f"truncated payload at group {len(groups)}")
|
||
group_len = data[i + 1]
|
||
end = i + 2 + group_len * 2
|
||
if end > len(data):
|
||
raise ValueError(f"group {len(groups)} overruns frame (end={end}, len={len(data)})")
|
||
payload = data[i + 2:end]
|
||
groups.append([unpack_from(">H", payload, k)[0] for k in range(0, len(payload), 2)])
|
||
i = end
|
||
|
||
out: dict[str, Any] = {}
|
||
|
||
# group 0 — cell voltages (mV; mask 0x7FFF per upstream — top bit is some flag)
|
||
cells = [(v & 0x7FFF) / 1000.0 for v in groups[0][:cell_count]]
|
||
for idx, cv in enumerate(cells, start=1):
|
||
out[f"cell_{idx:02d}_voltage"] = round(cv, 3)
|
||
if cells:
|
||
vmin, vmax = min(cells), max(cells)
|
||
out["cell_voltage_min"] = round(vmin, 3)
|
||
out["cell_voltage_max"] = round(vmax, 3)
|
||
out["cell_voltage_delta_mv"] = round((vmax - vmin) * 1000)
|
||
out["cell_lowest"] = cells.index(vmin) + 1
|
||
out["cell_highest"] = cells.index(vmax) + 1
|
||
|
||
# group 1 — current (signed; encoded as 30000 - A×100; positive = charge)
|
||
if groups[1]:
|
||
out["current"] = round((30000 - groups[1][0]) / 100.0, 2)
|
||
# group 2 — SoC × 100
|
||
if groups[2]:
|
||
out["soc"] = round(groups[2][0] / 100.0, 1)
|
||
# group 3 — capacity (Ah × 100)
|
||
if groups[3]:
|
||
out["capacity_ah"] = round(groups[3][0] / 100.0, 2)
|
||
# group 4 — temperatures (low byte − 50 °C)
|
||
for idx, raw in enumerate(groups[4][:6], start=1):
|
||
out[f"temperature_{idx}"] = (raw & 0xFF) - 50
|
||
# group 5 — alarm bitfield (second word per upstream)
|
||
flags = groups[5][1] if len(groups[5]) > 1 else 0
|
||
out["alarm_current_over"] = "on" if flags & 0b00001000 else "off"
|
||
out["alarm_voltage_high"] = "on" if flags & 0b00010000 else "off"
|
||
out["alarm_voltage_low"] = "on" if flags & 0b00100000 else "off"
|
||
out["alarm_temp_high_chg"] = "on" if flags & 0b01000000 else "off"
|
||
out["alarm_temp_low_chg"] = "on" if flags & 0b10000000 else "off"
|
||
# group 6 — cycle count
|
||
if groups[6]:
|
||
out["cycle_count"] = groups[6][0]
|
||
# group 7 — pack voltage (V × 100)
|
||
if groups[7]:
|
||
out["pack_voltage"] = round(groups[7][0] / 100.0, 2)
|
||
# groups 8-9 — undecoded; leave as future work
|
||
return out
|
||
|
||
|
||
# =============================================================================
|
||
# === passive mode: Modbus RTU framing ========================================
|
||
# =============================================================================
|
||
|
||
|
||
def crc16_modbus(data: bytes) -> int:
|
||
crc = 0xFFFF
|
||
for b in data:
|
||
crc ^= b
|
||
for _ in range(8):
|
||
if crc & 1:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc >>= 1
|
||
return crc
|
||
|
||
|
||
def _crc_ok(buf: bytes, start: int, length: int) -> bool:
|
||
if start + length > len(buf):
|
||
return False
|
||
body = buf[start:start + length - 2]
|
||
expected = buf[start + length - 2] | (buf[start + length - 1] << 8)
|
||
return crc16_modbus(body) == expected
|
||
|
||
|
||
_MODBUS_FUNCS = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x0F, 0x10, 0x16, 0x17}
|
||
|
||
|
||
def parse_modbus_frame_at(buf: bytes, start: int) -> tuple[int, str] | None:
|
||
if start + 4 > len(buf):
|
||
return None
|
||
func = buf[start + 1]
|
||
# exception response (5 bytes), only for legitimate function codes
|
||
if func >= 0x80 and (func & 0x7F) in _MODBUS_FUNCS \
|
||
and start + 5 <= len(buf) and 1 <= buf[start + 2] <= 11 \
|
||
and _crc_ok(buf, start, 5):
|
||
return (5, "exception")
|
||
if func == 0x03:
|
||
# query: 8 bytes
|
||
if _crc_ok(buf, start, 8):
|
||
return (8, "query")
|
||
# response: 1 + 1 + 1 + byte_count + 2
|
||
if start + 3 <= len(buf):
|
||
byte_count = buf[start + 2]
|
||
if 2 <= byte_count <= 250 and byte_count % 2 == 0:
|
||
total = 3 + byte_count + 2
|
||
if _crc_ok(buf, start, total):
|
||
return (total, "response")
|
||
return None
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class ModbusFrame:
|
||
address: int
|
||
function: int
|
||
kind: str
|
||
raw: bytes
|
||
|
||
@property
|
||
def registers(self) -> list[int]:
|
||
if self.kind != "response" or self.function != 0x03:
|
||
return []
|
||
bc = self.raw[2]
|
||
d = self.raw[3:3 + bc]
|
||
return [(d[i] << 8) | d[i + 1] for i in range(0, len(d), 2)]
|
||
|
||
|
||
def decode_modbus_response(frame: ModbusFrame) -> dict[str, Any]:
|
||
"""Raw-register dump; promote to named fields once we know the layout."""
|
||
return {f"register_{i:02d}": v for i, v in enumerate(frame.registers)}
|
||
|
||
|
||
# ---- modbus_per_pack active-poll decoder (EG4 LP4V2) -----------------------
|
||
# Register map derived from lv_host.app BMS Tool SQLite schema + UI labels +
|
||
# live probing of a single pack. See ../NOTES.md "Register map" section.
|
||
# High-confidence fields promoted to named entities; unknowns (reg 32, 35,
|
||
# 38-40, 43-45) still emitted as register_NN for correlation.
|
||
|
||
_WARNING_BITS = [
|
||
"pack_ov", "cell_ov", "pack_uv", "cell_uv",
|
||
"charge_oc", "discharge_oc", "temp_anomaly", "mos_ot",
|
||
"charge_ot", "discharge_ot", "charge_ut", "discharge_ut",
|
||
"low_capacity", "other_error",
|
||
]
|
||
_PROTECTION_BITS = [
|
||
"pack_ov", "cell_ov", "pack_uv", "cell_uv",
|
||
"charge_oc", "discharge_oc", "temp_anomaly", "mos_ot",
|
||
"charge_ot", "discharge_ot", "charge_ut", "discharge_ut",
|
||
"float_stopped", "discharge_sc",
|
||
]
|
||
|
||
|
||
def _signed16(v: int) -> int:
|
||
return v - 0x10000 if v & 0x8000 else v
|
||
|
||
|
||
def decode_eg4_modbus_regs(regs: list[int]) -> dict[str, Any]:
|
||
"""Decode the 47-reg read-holding-regs response from an LP4V2 BMS.
|
||
Emits named HA entities where meaning is known; raw register_NN
|
||
passthrough for the rest."""
|
||
out: dict[str, Any] = {}
|
||
# always emit raw registers — invaluable for future refinement
|
||
for i, v in enumerate(regs):
|
||
out[f"register_{i:02d}"] = v
|
||
|
||
if len(regs) < 47:
|
||
return out
|
||
|
||
# --- pack-level V / I (regs 0, 1) ---
|
||
out["pack_voltage"] = round(regs[0] / 100.0, 2)
|
||
out["pack_current"] = round(_signed16(regs[1]) / 100.0, 2)
|
||
|
||
# --- 16 cell voltages (regs 2-17), mV ---
|
||
cells_v = [regs[2 + i] / 1000.0 for i in range(16)]
|
||
for i, cv in enumerate(cells_v, start=1):
|
||
out[f"cell_{i:02d}_voltage"] = round(cv, 3)
|
||
vmin, vmax = min(cells_v), max(cells_v)
|
||
out["cell_voltage_min"] = round(vmin, 3)
|
||
out["cell_voltage_max"] = round(vmax, 3)
|
||
out["cell_voltage_delta_mv"] = round((vmax - vmin) * 1000)
|
||
out["cell_lowest"] = cells_v.index(vmin) + 1
|
||
out["cell_highest"] = cells_v.index(vmax) + 1
|
||
|
||
# --- temperatures (regs 18-21 = Temp_01..04, reg 24 = Temp_PCB) ---
|
||
out["temperature_01"] = regs[18]
|
||
out["temperature_02"] = regs[19]
|
||
out["temperature_03"] = regs[20]
|
||
out["temperature_04"] = regs[21]
|
||
out["temperature_pcb"] = regs[24]
|
||
|
||
# --- SoC / SoH (regs 22, 23) ---
|
||
out["soc"] = regs[22]
|
||
out["soh"] = regs[23]
|
||
|
||
# --- heater / status (regs 25-30) ---
|
||
# reg 30 has been observed = 1 on a healthy pack; treat as binary
|
||
out["heater"] = "on" if regs[30] & 0x01 else "off"
|
||
|
||
# --- max charge/discharge current limit (reg 31), A ---
|
||
out["max_current_limit"] = round(regs[31] / 100.0, 2)
|
||
|
||
# --- bitfields: warnings (reg 33), protections (reg 34), error code (reg 35) ---
|
||
warn = regs[33]
|
||
for i, name in enumerate(_WARNING_BITS):
|
||
out[f"warning_{name}"] = "on" if (warn >> i) & 1 else "off"
|
||
prot = regs[34]
|
||
for i, name in enumerate(_PROTECTION_BITS):
|
||
out[f"protection_{name}"] = "on" if (prot >> i) & 1 else "off"
|
||
out["error_code"] = regs[35]
|
||
|
||
# --- static-ish (regs 36, 37) ---
|
||
out["cell_count"] = regs[36]
|
||
out["capacity_ah"] = round(regs[37] / 10.0, 1)
|
||
out["remaining_ah"] = round(regs[38] / 100.0, 2)
|
||
out["cycle_count"] = regs[39]
|
||
out["battery_mode"] = regs[40]
|
||
|
||
# BMS firmware version — regs 41 & 42 appear to hold version codes; emit
|
||
# the raw u16s alongside a decimal representation for easier HA display
|
||
out["bms_version_hi"] = regs[41]
|
||
out["bms_version_lo"] = regs[42]
|
||
|
||
# reg 46 increments ~1.25 Hz on live bus — likely uptime in deciseconds
|
||
out["uptime_ds"] = regs[46]
|
||
|
||
# --- block-2 strings (regs 105..123) — fetched on the second Modbus read ---
|
||
if len(regs) >= 124:
|
||
out["model"] = _ascii_from_regs(regs, 105, 10) # 20 chars
|
||
out["firmware_version"] = _ascii_from_regs(regs, 117, 3) # 6 chars (e.g. "Z03T21")
|
||
out["firmware_date"] = _ascii_from_regs(regs, 120, 4) # 8 chars (e.g. "20260206")
|
||
|
||
return out
|
||
|
||
|
||
def _ascii_from_regs(regs: list[int], start: int, count_regs: int) -> str:
|
||
"""Convert `count_regs` u16 values into an ASCII string (high byte first
|
||
per Modbus convention). Trailing nulls and non-printable trailing junk
|
||
are stripped."""
|
||
chars: list[str] = []
|
||
for r in regs[start:start + count_regs]:
|
||
for ch in ((r >> 8) & 0xFF, r & 0xFF):
|
||
if ch == 0:
|
||
break
|
||
if 32 <= ch < 127:
|
||
chars.append(chr(ch))
|
||
else:
|
||
continue
|
||
break
|
||
return "".join(chars).rstrip()
|
||
|
||
|
||
class ModbusActivePoller:
|
||
"""One instance per pack. Opens its own serial port, issues two
|
||
read-holding-regs fn=0x03 queries per `poll()` call, and returns a
|
||
sparse 136-register list (indices 0-38 from the first read, 45-135
|
||
from the second, gap at 39-44 zero-padded).
|
||
|
||
The two reads mirror what lv_host.app's BMS Tool issues in its
|
||
monitoring loop:
|
||
block 1 — count=39 @ 0 (live status)
|
||
block 2 — count=91 @ 0x2d (counters + model + firmware strings)
|
||
|
||
Graceful: a pack whose port doesn't exist or whose BMS is off will
|
||
raise on poll, and the main loop catches + rate-limits the noise."""
|
||
|
||
BLOCK_1_START = 0x0000
|
||
BLOCK_1_COUNT = 39
|
||
BLOCK_2_START = 0x002D # = 45
|
||
BLOCK_2_COUNT = 91 # covers regs 45..135
|
||
TOTAL_REG_COUNT = BLOCK_2_START + BLOCK_2_COUNT # 136
|
||
|
||
def __init__(self, port: str, baud: int, address: int, timeout_s: float = 1.0):
|
||
self._port_path = port
|
||
self._baud = baud
|
||
self._address = address
|
||
self._timeout_s = timeout_s
|
||
self._ser: serial.Serial | None = None
|
||
|
||
def _open(self) -> None:
|
||
if self._ser is None or not self._ser.is_open:
|
||
self._ser = serial.Serial(
|
||
port=self._port_path, baudrate=self._baud, timeout=0.2,
|
||
bytesize=8, parity="N", stopbits=1,
|
||
)
|
||
|
||
def _read_block(self, start: int, count: int) -> list[int]:
|
||
body = bytes([self._address, 0x03,
|
||
start >> 8, start & 0xFF, count >> 8, count & 0xFF])
|
||
crc = crc16_modbus(body)
|
||
frame = body + bytes([crc & 0xFF, crc >> 8])
|
||
|
||
assert self._ser is not None
|
||
self._ser.reset_input_buffer()
|
||
self._ser.write(frame)
|
||
|
||
expected = 3 + count * 2 + 2
|
||
buf = bytearray()
|
||
deadline = time.monotonic() + self._timeout_s
|
||
while time.monotonic() < deadline and len(buf) < expected:
|
||
chunk = self._ser.read(expected - len(buf))
|
||
if chunk:
|
||
buf.extend(chunk)
|
||
raw = bytes(buf)
|
||
log.debug("pack 0x%02x [%d@%d] tx=%s rx=%s",
|
||
self._address, count, start, frame.hex(" "), raw.hex(" "))
|
||
|
||
if len(raw) < 5 or raw[0] != self._address or raw[1] != 0x03:
|
||
raise RuntimeError(f"no/bad response ({len(raw)} B) for read({count}@{start})")
|
||
bc = raw[2]
|
||
if len(raw) < 3 + bc + 2:
|
||
raise RuntimeError(f"truncated response for read({count}@{start})")
|
||
if not _crc_ok(raw, 0, 3 + bc + 2):
|
||
raise RuntimeError(f"CRC mismatch for read({count}@{start})")
|
||
data = raw[3:3 + bc]
|
||
return [(data[i] << 8) | data[i + 1] for i in range(0, len(data), 2)]
|
||
|
||
def poll(self) -> list[int]:
|
||
self._open()
|
||
# Build a sparse 136-element register array.
|
||
regs = [0] * self.TOTAL_REG_COUNT
|
||
block1 = self._read_block(self.BLOCK_1_START, self.BLOCK_1_COUNT)
|
||
for i, v in enumerate(block1):
|
||
regs[self.BLOCK_1_START + i] = v
|
||
# Brief gap between queries — RS-485 silence + give BMS a moment
|
||
time.sleep(0.05)
|
||
block2 = self._read_block(self.BLOCK_2_START, self.BLOCK_2_COUNT)
|
||
for i, v in enumerate(block2):
|
||
regs[self.BLOCK_2_START + i] = v
|
||
return regs
|
||
|
||
def close(self) -> None:
|
||
if self._ser is not None and self._ser.is_open:
|
||
self._ser.close()
|
||
|
||
|
||
# =============================================================================
|
||
# === transports ==============================================================
|
||
# =============================================================================
|
||
# Two abstractions; main loop picks the right one based on bus.mode.
|
||
|
||
|
||
class ActiveTransport:
|
||
"""Request-response transport for active mode."""
|
||
|
||
def query_general(self, address: int) -> bytes:
|
||
raise NotImplementedError
|
||
|
||
def close(self) -> None:
|
||
pass
|
||
|
||
|
||
class PassiveListener:
|
||
"""Continuous frame-iterator for passive mode."""
|
||
|
||
def frames(self) -> Iterator[ModbusFrame]:
|
||
raise NotImplementedError
|
||
|
||
def close(self) -> None:
|
||
pass
|
||
|
||
|
||
# --- active: serial + mock --------------------------------------------------
|
||
|
||
|
||
class SerialActiveTransport(ActiveTransport):
|
||
def __init__(self, port: str, baud: int, timeout_s: float):
|
||
self._timeout_s = timeout_s
|
||
self._ser = serial.Serial(port=port, baudrate=baud, timeout=0.25,
|
||
bytesize=8, parity="N", stopbits=1)
|
||
|
||
def query_general(self, address: int) -> bytes:
|
||
frame = encode_eg4_request(address, CMD_GENERAL_STATUS)
|
||
log.debug("TX addr=0x%02x: %s", address, frame.hex())
|
||
self._ser.reset_input_buffer()
|
||
self._ser.write(frame)
|
||
buf = bytearray()
|
||
deadline = time.monotonic() + self._timeout_s
|
||
while time.monotonic() < deadline:
|
||
chunk = self._ser.read(256)
|
||
if chunk:
|
||
buf.extend(chunk)
|
||
if buf[0:1] == b"\x7E" and buf.endswith(b"\x0D"):
|
||
break
|
||
log.debug("RX addr=0x%02x: %s", address, bytes(buf).hex())
|
||
return bytes(buf)
|
||
|
||
def close(self) -> None:
|
||
self._ser.close()
|
||
|
||
|
||
class MockActiveTransport(ActiveTransport):
|
||
"""Synthesise EG4 7E/0D replies. Values drift per call so HA dashboards
|
||
look alive in dry-run mode."""
|
||
|
||
def __init__(self, cell_count: int = 16):
|
||
self._cell_count = cell_count
|
||
self._call = 0
|
||
|
||
def query_general(self, address: int) -> bytes:
|
||
self._call += 1
|
||
rng = random.Random(address * 1000 + self._call)
|
||
base_mv = 3280 + rng.randint(-5, 5)
|
||
cells_mv = [max(0, min(0x7FFF, base_mv + rng.randint(-8, 8)))
|
||
for _ in range(self._cell_count)]
|
||
current_x100 = rng.randint(-500, 2000)
|
||
current_raw = 30000 - current_x100
|
||
soc_x100 = (50 + rng.randint(-2, 2)) * 100
|
||
cap_ah_x100 = 5000 + rng.randint(-10, 10)
|
||
temps_raw = [50 + 25 + rng.randint(-3, 3) for _ in range(4)]
|
||
cycles = 42 + address
|
||
pack_v_x100 = round(sum(cells_mv) / 10)
|
||
|
||
def grp(gid: int, values: list[int]) -> bytes:
|
||
return bytes([gid, len(values)]) + b"".join(
|
||
struct.pack(">H", v & 0xFFFF) for v in values
|
||
)
|
||
|
||
body = b"".join([
|
||
grp(0x01, cells_mv),
|
||
grp(0x02, [current_raw]),
|
||
grp(0x03, [soc_x100]),
|
||
grp(0x04, [cap_ah_x100]),
|
||
grp(0x05, temps_raw),
|
||
grp(0x06, [0, 0]), # alarms = clear
|
||
grp(0x07, [cycles]),
|
||
grp(0x08, [pack_v_x100]),
|
||
grp(0x09, []),
|
||
grp(0x0A, []),
|
||
])
|
||
# checksum byte tolerated as 0x00 by the upstream parser
|
||
return bytes([0x7E, address, CMD_GENERAL_STATUS, len(body) & 0xFF]) \
|
||
+ body + bytes([0x00, 0x0D])
|
||
|
||
|
||
# --- passive: serial + mock -------------------------------------------------
|
||
|
||
|
||
class SerialPassiveListener(PassiveListener):
|
||
_BUF_MAX = 4096
|
||
|
||
def __init__(self, port: str, baud: int, read_chunk: int = 512):
|
||
self._read_chunk = read_chunk
|
||
self._ser = serial.Serial(port=port, baudrate=baud, timeout=0.1,
|
||
bytesize=8, parity="N", stopbits=1)
|
||
self._buf = bytearray()
|
||
|
||
def frames(self) -> Iterator[ModbusFrame]:
|
||
while True:
|
||
chunk = self._ser.read(self._read_chunk)
|
||
if chunk:
|
||
self._buf.extend(chunk)
|
||
if len(self._buf) > self._BUF_MAX:
|
||
del self._buf[:self._BUF_MAX // 2]
|
||
yield from self._extract()
|
||
|
||
def _extract(self) -> Iterator[ModbusFrame]:
|
||
i = 0
|
||
while i < len(self._buf) - 4:
|
||
r = parse_modbus_frame_at(self._buf, i)
|
||
if r is None:
|
||
i += 1
|
||
continue
|
||
length, kind = r
|
||
raw = bytes(self._buf[i:i + length])
|
||
yield ModbusFrame(address=raw[0], function=raw[1], kind=kind, raw=raw)
|
||
del self._buf[:i + length]
|
||
i = 0
|
||
|
||
def close(self) -> None:
|
||
self._ser.close()
|
||
|
||
|
||
class MockPassiveListener(PassiveListener):
|
||
def __init__(self, packs: list[PackConfig], gap_s: float = 0.5):
|
||
self._packs = packs
|
||
self._gap_s = gap_s
|
||
self._tick = 0
|
||
|
||
def frames(self) -> Iterator[ModbusFrame]:
|
||
while True:
|
||
for pack in self._packs:
|
||
self._tick += 1
|
||
q = self._build_query(pack.address)
|
||
yield ModbusFrame(address=pack.address, function=0x03, kind="query", raw=q)
|
||
time.sleep(0.05)
|
||
r = self._build_response(pack.address)
|
||
yield ModbusFrame(address=pack.address, function=0x03, kind="response", raw=r)
|
||
time.sleep(self._gap_s)
|
||
|
||
def _build_query(self, addr: int) -> bytes:
|
||
body = bytes([addr, 0x03, 0x00, 0x00, 0x00, 0x2F])
|
||
crc = crc16_modbus(body)
|
||
return body + bytes([crc & 0xFF, crc >> 8])
|
||
|
||
def _build_response(self, addr: int) -> bytes:
|
||
rng = random.Random(addr * 1000 + self._tick)
|
||
regs = [3280 + rng.randint(-5, 5) for _ in range(16)]
|
||
regs += [round(52.48 * 100), 50_00, rng.randint(0, 100)]
|
||
while len(regs) < 47:
|
||
regs.append(rng.randint(0, 100))
|
||
body = bytes([addr, 0x03, len(regs) * 2]) + b"".join(
|
||
struct.pack(">H", r & 0xFFFF) for r in regs
|
||
)
|
||
crc = crc16_modbus(body)
|
||
return body + bytes([crc & 0xFF, crc >> 8])
|
||
|
||
|
||
# =============================================================================
|
||
# === MQTT publisher (HA auto-discovery) ======================================
|
||
# =============================================================================
|
||
|
||
|
||
# Field metadata. Active and passive modes emit different keys; both sets
|
||
# coexist here without overlap.
|
||
_FIELD_META: dict[str, tuple[str | None, str | None, str | None, str | None]] = {
|
||
# active mode (EG4 7E/0D decoded)
|
||
"pack_voltage": ("V", "voltage", "measurement", "mdi:battery-outline"),
|
||
"current": ("A", "current", "measurement", "mdi:current-dc"),
|
||
"soc": ("%", "battery", "measurement", "mdi:battery-70"),
|
||
"capacity_ah": ("Ah", None, "measurement", "mdi:battery-clock"),
|
||
"cycle_count": (None, None, "total", "mdi:counter"),
|
||
"cell_voltage_min": ("V", "voltage", "measurement", "mdi:arrow-down-bold"),
|
||
"cell_voltage_max": ("V", "voltage", "measurement", "mdi:arrow-up-bold"),
|
||
"cell_voltage_delta_mv": ("mV", None, "measurement", "mdi:sine-wave"),
|
||
"cell_lowest": (None, None, "measurement", "mdi:numeric"),
|
||
"cell_highest": (None, None, "measurement", "mdi:numeric"),
|
||
"alarm_current_over": (None, None, None, "mdi:alert-octagon"),
|
||
"alarm_voltage_high": (None, None, None, "mdi:alert"),
|
||
"alarm_voltage_low": (None, None, None, "mdi:alert"),
|
||
"alarm_temp_high_chg": (None, None, None, "mdi:thermometer-alert"),
|
||
"alarm_temp_low_chg": (None, None, None, "mdi:thermometer-alert"),
|
||
}
|
||
for _i in range(1, 33):
|
||
_FIELD_META[f"cell_{_i:02d}_voltage"] = ("V", "voltage", "measurement", "mdi:battery-outline")
|
||
for _i in range(1, 7):
|
||
_FIELD_META[f"temperature_{_i}"] = ("°C", "temperature", "measurement", "mdi:thermometer")
|
||
# modbus_per_pack named fields (EG4 register map)
|
||
_FIELD_META.update({
|
||
"pack_current": ("A", "current", "measurement", "mdi:current-dc"),
|
||
"temperature_01": ("°C", "temperature", "measurement", "mdi:thermometer"),
|
||
"temperature_02": ("°C", "temperature", "measurement", "mdi:thermometer"),
|
||
"temperature_03": ("°C", "temperature", "measurement", "mdi:thermometer"),
|
||
"temperature_04": ("°C", "temperature", "measurement", "mdi:thermometer"),
|
||
"temperature_pcb": ("°C", "temperature", "measurement", "mdi:chip"),
|
||
"heater": (None, None, None, "mdi:heating-coil"),
|
||
"max_current_limit": ("A", "current", "measurement", "mdi:current-dc"),
|
||
"error_code": (None, None, None, "mdi:alert-octagon"),
|
||
"cell_count": (None, None, "measurement", "mdi:numeric"),
|
||
"remaining_ah": ("Ah", None, "measurement", "mdi:battery-clock"),
|
||
"battery_mode": (None, None, None, "mdi:state-machine"),
|
||
"bms_version_hi": (None, None, None, "mdi:chip"),
|
||
"bms_version_lo": (None, None, None, "mdi:chip"),
|
||
"uptime_ds": (None, None, "total_increasing", "mdi:timer-outline"),
|
||
"model": (None, None, None, "mdi:battery-outline"),
|
||
"firmware_version": (None, None, None, "mdi:chip"),
|
||
"firmware_date": (None, None, None, "mdi:calendar"),
|
||
})
|
||
for _name in _WARNING_BITS:
|
||
_FIELD_META[f"warning_{_name}"] = (None, None, None, "mdi:alert")
|
||
for _name in _PROTECTION_BITS:
|
||
_FIELD_META[f"protection_{_name}"] = (None, None, None, "mdi:shield-alert")
|
||
|
||
|
||
def field_meta(key: str) -> tuple[str | None, str | None, str | None, str | None]:
|
||
if key.startswith("register_"):
|
||
return (None, None, "measurement", "mdi:numeric")
|
||
return _FIELD_META.get(key, (None, None, None, None))
|
||
|
||
|
||
class MQTTPublisher:
|
||
def __init__(self, cfg: MQTTConfig, dry_run: bool = False):
|
||
self._cfg = cfg
|
||
self._dry_run = dry_run
|
||
self._client: mqtt.Client | None = None
|
||
self._discovered: set[tuple[str, str]] = set()
|
||
if not dry_run:
|
||
c = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="eg4-battery")
|
||
c.username_pw_set(cfg.username, cfg.password)
|
||
c.connect(cfg.host, cfg.port, keepalive=60)
|
||
c.loop_start()
|
||
self._client = c
|
||
log.info("connected to MQTT %s:%d", cfg.host, cfg.port)
|
||
|
||
def publish_pack(self, pack_name: str, readings: dict[str, Any]) -> None:
|
||
for key, value in readings.items():
|
||
self._publish_one(pack_name, key, value)
|
||
|
||
def _publish_one(self, pack_name: str, key: str, value: Any) -> None:
|
||
entity_id = f"{pack_name}_{key}"
|
||
state_topic = f"{self._cfg.discovery_prefix}/sensor/{entity_id}/state"
|
||
disco_key = (pack_name, key)
|
||
if disco_key not in self._discovered:
|
||
self._publish_discovery(pack_name, key, state_topic)
|
||
self._discovered.add(disco_key)
|
||
payload = json.dumps(value) if isinstance(value, (dict, list)) else str(value)
|
||
if self._dry_run:
|
||
print(f" {state_topic} {payload}")
|
||
else:
|
||
self._client.publish(state_topic, payload, qos=0, retain=False)
|
||
|
||
def _publish_discovery(self, pack_name: str, key: str, state_topic: str) -> None:
|
||
unit, device_class, state_class, icon = field_meta(key)
|
||
cfg = {
|
||
"name": f"{pack_name} {key}",
|
||
"state_topic": state_topic,
|
||
"unique_id": f"{pack_name}_{key}_eg4",
|
||
"device": {
|
||
"name": f"EG4 LifePower4 {pack_name}",
|
||
"identifiers": [pack_name],
|
||
"model": "LifePower4 48V 100Ah v2 Auto-Addressing",
|
||
"manufacturer": "EG4 Electronics",
|
||
},
|
||
}
|
||
if unit is not None: cfg["unit_of_measurement"] = unit
|
||
if device_class is not None: cfg["device_class"] = device_class
|
||
if state_class is not None: cfg["state_class"] = state_class
|
||
if icon is not None: cfg["icon"] = icon
|
||
topic = f"{self._cfg.discovery_prefix}/sensor/{pack_name}_{key}/config"
|
||
payload = json.dumps(cfg)
|
||
if self._dry_run:
|
||
print(f" [discovery] {topic} {payload}")
|
||
else:
|
||
self._client.publish(topic, payload, qos=0, retain=True)
|
||
|
||
def close(self) -> None:
|
||
if self._client is not None:
|
||
self._client.loop_stop()
|
||
self._client.disconnect()
|
||
|
||
|
||
# =============================================================================
|
||
# === per-pack state & rate-limited logging ===================================
|
||
# =============================================================================
|
||
|
||
|
||
@dataclasses.dataclass
|
||
class _PackState:
|
||
ok: bool = False
|
||
last_error_category: str = ""
|
||
consecutive_errors: int = 0
|
||
response_count: int = 0
|
||
first_seen_logged: bool = False
|
||
|
||
|
||
_FAIL_HEARTBEAT_CYCLES = 360 # re-log a stuck failure every ~hour at 10 s cadence
|
||
|
||
|
||
def _resolve_pack_name(addr: int, packs: list[PackConfig]) -> str:
|
||
for p in packs:
|
||
if p.address == addr:
|
||
return p.name
|
||
return f"lifepower4_addr_{addr:02x}"
|
||
|
||
|
||
# =============================================================================
|
||
# === main loops ==============================================================
|
||
# =============================================================================
|
||
|
||
|
||
def run_active(transport: ActiveTransport, publisher: MQTTPublisher, cfg: AppConfig,
|
||
states: dict[str, _PackState], one_cycle: bool = False) -> None:
|
||
"""Round-robin poll every configured pack; rate-limit error noise."""
|
||
while True:
|
||
cycle_start = time.monotonic()
|
||
for pack in cfg.packs:
|
||
st = states.setdefault(pack.name, _PackState())
|
||
try:
|
||
raw = transport.query_general(pack.address)
|
||
if not raw:
|
||
raise RuntimeError(f"empty response from addr=0x{pack.address:02x}")
|
||
readings = decode_eg4_general_status(raw, cell_count=cfg.cell_count)
|
||
publisher.publish_pack(pack.name, readings)
|
||
st.response_count += 1
|
||
if not st.ok and st.consecutive_errors > 0:
|
||
log.info("pack %s (0x%02x): recovered after %d failed cycle(s)",
|
||
pack.name, pack.address, st.consecutive_errors)
|
||
st.ok = True
|
||
st.consecutive_errors = 0
|
||
except Exception as e:
|
||
category = f"{type(e).__name__}:{str(e).split(':', 1)[0]}"
|
||
if st.ok or category != st.last_error_category:
|
||
log.warning("pack %s (0x%02x): %s", pack.name, pack.address, e)
|
||
elif st.consecutive_errors > 0 and st.consecutive_errors % _FAIL_HEARTBEAT_CYCLES == 0:
|
||
log.warning("pack %s (0x%02x): still failing (%d cycles): %s",
|
||
pack.name, pack.address, st.consecutive_errors, e)
|
||
st.ok = False
|
||
st.last_error_category = category
|
||
st.consecutive_errors += 1
|
||
if one_cycle:
|
||
return
|
||
elapsed = time.monotonic() - cycle_start
|
||
time.sleep(max(0.0, cfg.bus.poll_interval_s - elapsed))
|
||
|
||
|
||
def run_passive(listener: PassiveListener, publisher: MQTTPublisher, cfg: AppConfig,
|
||
trace: bool, max_frames: int | None = None) -> None:
|
||
"""Consume frames as they arrive; publish on every fn=0x03 response."""
|
||
states: dict[int, _PackState] = {}
|
||
seen_unconfigured: set[int] = set()
|
||
configured = {p.address for p in cfg.packs}
|
||
n = 0
|
||
for frame in listener.frames():
|
||
n += 1
|
||
if trace:
|
||
log.debug("%r raw=%s", frame, frame.raw.hex(" "))
|
||
if frame.kind != "response" or frame.function != 0x03:
|
||
if max_frames is not None and n >= max_frames:
|
||
return
|
||
continue
|
||
|
||
st = states.setdefault(frame.address, _PackState())
|
||
st.response_count += 1
|
||
if not st.first_seen_logged:
|
||
if frame.address in configured:
|
||
log.info("first response from configured pack 0x%02x (%s)",
|
||
frame.address, _resolve_pack_name(frame.address, cfg.packs))
|
||
elif frame.address not in seen_unconfigured:
|
||
log.warning("response from unconfigured slave 0x%02x — auto-naming as %s",
|
||
frame.address, _resolve_pack_name(frame.address, cfg.packs))
|
||
seen_unconfigured.add(frame.address)
|
||
st.first_seen_logged = True
|
||
try:
|
||
readings = decode_modbus_response(frame)
|
||
except Exception as e:
|
||
log.warning("decode failed for addr 0x%02x: %s (raw=%s)",
|
||
frame.address, e, frame.raw.hex(" "))
|
||
continue
|
||
publisher.publish_pack(_resolve_pack_name(frame.address, cfg.packs), readings)
|
||
if max_frames is not None and n >= max_frames:
|
||
return
|
||
|
||
|
||
def run_modbus_per_pack(cfg: AppConfig, publisher: MQTTPublisher,
|
||
states: dict[str, _PackState], one_cycle: bool = False,
|
||
dry_run: bool = False) -> None:
|
||
"""One adapter per pack. Each `PackConfig` must have `port` and `baud`
|
||
set. Round-robin poll every pack on its own serial port; decode
|
||
Modbus response into named HA entities + raw register_NN dump."""
|
||
pollers: dict[str, ModbusActivePoller] = {}
|
||
mock_regs_call: dict[str, int] = {}
|
||
|
||
def make_poller(p: PackConfig) -> ModbusActivePoller | None:
|
||
if dry_run:
|
||
return None # mock path, no real poller
|
||
if not p.port:
|
||
log.warning("pack %s: no `port` set in config; skipping", p.name)
|
||
return None
|
||
baud = p.baud or cfg.bus.baud
|
||
try:
|
||
return ModbusActivePoller(p.port, baud, p.address, cfg.bus.timeout_s)
|
||
except Exception as e:
|
||
log.warning("pack %s: could not open %s: %s", p.name, p.port, e)
|
||
return None
|
||
|
||
for p in cfg.packs:
|
||
pl = make_poller(p)
|
||
if pl is not None:
|
||
pollers[p.name] = pl
|
||
|
||
try:
|
||
while True:
|
||
cycle_start = time.monotonic()
|
||
for p in cfg.packs:
|
||
st = states.setdefault(p.name, _PackState())
|
||
try:
|
||
if dry_run:
|
||
mock_regs_call[p.name] = mock_regs_call.get(p.name, 0) + 1
|
||
regs = _mock_modbus_regs(p.address, mock_regs_call[p.name])
|
||
else:
|
||
if p.name not in pollers:
|
||
raise RuntimeError(f"no poller configured for {p.name}")
|
||
regs = pollers[p.name].poll()
|
||
readings = decode_eg4_modbus_regs(regs)
|
||
publisher.publish_pack(p.name, readings)
|
||
st.response_count += 1
|
||
if not st.ok and st.consecutive_errors > 0:
|
||
log.info("pack %s: recovered after %d failed cycle(s)",
|
||
p.name, st.consecutive_errors)
|
||
st.ok = True
|
||
st.consecutive_errors = 0
|
||
except Exception as e:
|
||
category = f"{type(e).__name__}:{str(e).split(':', 1)[0]}"
|
||
if st.ok or category != st.last_error_category:
|
||
log.warning("pack %s (0x%02x): %s", p.name, p.address, e)
|
||
elif st.consecutive_errors > 0 \
|
||
and st.consecutive_errors % _FAIL_HEARTBEAT_CYCLES == 0:
|
||
log.warning("pack %s (0x%02x): still failing (%d cycles): %s",
|
||
p.name, p.address, st.consecutive_errors, e)
|
||
st.ok = False
|
||
st.last_error_category = category
|
||
st.consecutive_errors += 1
|
||
if one_cycle:
|
||
return
|
||
elapsed = time.monotonic() - cycle_start
|
||
time.sleep(max(0.0, cfg.bus.poll_interval_s - elapsed))
|
||
finally:
|
||
for pl in pollers.values():
|
||
pl.close()
|
||
|
||
|
||
def _mock_modbus_regs(address: int, tick: int) -> list[int]:
|
||
"""Synthesise 136 realistic-looking registers for dry-run mode (covers
|
||
both block 1 [0..38] and block 2 [45..135] reads of the live daemon)."""
|
||
rng = random.Random(address * 1000 + tick)
|
||
base_mv = 3280 + rng.randint(-3, 3)
|
||
cells_mv = [base_mv + rng.randint(-8, 8) for _ in range(16)]
|
||
regs: list[int] = [0] * 136
|
||
regs[0] = sum(cells_mv) // 10 # pack voltage × 100
|
||
regs[1] = (30000 - rng.randint(-500, 2000)) & 0xFFFF
|
||
for i, mv in enumerate(cells_mv, start=2):
|
||
regs[i] = mv
|
||
regs[18] = 21 + rng.randint(-1, 1)
|
||
regs[19] = 21 + rng.randint(-1, 1)
|
||
regs[20] = 20 + rng.randint(-1, 1)
|
||
regs[21] = 54 + rng.randint(-1, 1)
|
||
regs[22] = 100
|
||
regs[23] = 100
|
||
regs[24] = 55
|
||
regs[30] = 1
|
||
regs[31] = 5493
|
||
regs[32] = 10752
|
||
regs[33] = 0 # no warnings
|
||
regs[34] = 0 # no protections
|
||
regs[35] = 0 # error code
|
||
regs[36] = 16 # cell count
|
||
regs[37] = 1000 # 100.0 Ah
|
||
regs[46] = (tick * 5) & 0xFFFF # runtime counter
|
||
|
||
# block-2 strings, packed high-byte-first per Modbus convention
|
||
def _pack_str(target: list[int], offset: int, s: str) -> None:
|
||
for i in range(0, len(s), 2):
|
||
hi = ord(s[i])
|
||
lo = ord(s[i + 1]) if i + 1 < len(s) else 0
|
||
target[offset + i // 2] = (hi << 8) | lo
|
||
|
||
_pack_str(regs, 105, "LFP-51.2V100Ah-V1.0")
|
||
_pack_str(regs, 117, "Z03T21")
|
||
_pack_str(regs, 120, "20260206")
|
||
return regs
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(
|
||
description="EG4 LifePower4 v2 → MQTT bridge.")
|
||
ap.add_argument("-C", "--config", required=True, type=Path)
|
||
ap.add_argument("--dry-run", action="store_true",
|
||
help="Mock-bus smoke test — one cycle, print, exit.")
|
||
ap.add_argument("--trace", action="store_true", help="Log every frame.")
|
||
args = ap.parse_args()
|
||
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if args.trace else logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||
)
|
||
|
||
cfg = load_config(args.config)
|
||
valid_modes = {"modbus_per_pack", "active", "passive"}
|
||
if cfg.bus.mode not in valid_modes:
|
||
raise SystemExit(f"bus.mode must be one of {valid_modes}, got {cfg.bus.mode!r}")
|
||
if cfg.bus.transport not in {"serial", "mock"}:
|
||
raise SystemExit(f"bus.transport must be 'serial' or 'mock', got {cfg.bus.transport!r}")
|
||
|
||
publisher = MQTTPublisher(cfg.mqtt, dry_run=args.dry_run)
|
||
log.info("eg4-battery starting: mode=%s %d configured pack(s)",
|
||
cfg.bus.mode, len(cfg.packs))
|
||
|
||
use_mock = args.dry_run or cfg.bus.transport == "mock"
|
||
|
||
try:
|
||
if cfg.bus.mode == "modbus_per_pack":
|
||
run_modbus_per_pack(cfg, publisher, states={},
|
||
one_cycle=args.dry_run, dry_run=args.dry_run)
|
||
elif cfg.bus.mode == "active":
|
||
transport: ActiveTransport
|
||
transport = (MockActiveTransport(cell_count=cfg.cell_count) if use_mock
|
||
else SerialActiveTransport(cfg.bus.port, cfg.bus.baud, cfg.bus.timeout_s))
|
||
try:
|
||
run_active(transport, publisher, cfg, states={}, one_cycle=args.dry_run)
|
||
finally:
|
||
transport.close()
|
||
else: # passive
|
||
listener: PassiveListener
|
||
listener = (MockPassiveListener(cfg.packs) if use_mock
|
||
else SerialPassiveListener(cfg.bus.port, cfg.bus.baud, cfg.bus.read_chunk))
|
||
try:
|
||
run_passive(listener, publisher, cfg, trace=args.trace,
|
||
max_frames=(2 * len(cfg.packs) if args.dry_run else None))
|
||
finally:
|
||
listener.close()
|
||
return 0
|
||
except KeyboardInterrupt:
|
||
return 0
|
||
finally:
|
||
publisher.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|