Cleaned up battery mqtt topics

This commit is contained in:
2026-04-26 08:18:57 -04:00
parent e893be8c35
commit b526b10cf5
3 changed files with 218 additions and 18 deletions

View File

@@ -96,6 +96,7 @@ class AppConfig:
mqtt: MQTTConfig
packs: list[PackConfig]
cell_count: int = 16 # active mode only
expose_raw_registers: bool = False # publish register_NN entities (modbus_per_pack)
def load_config(path: Path) -> AppConfig:
@@ -121,6 +122,7 @@ def load_config(path: Path) -> AppConfig:
mqtt=MQTTConfig(**mqtt_raw),
packs=[PackConfig(**p) for p in raw["packs"]],
cell_count=raw.get("cell_count", 16),
expose_raw_registers=raw.get("expose_raw_registers", False),
)
@@ -304,14 +306,15 @@ def _signed16(v: int) -> int:
return v - 0x10000 if v & 0x8000 else v
def decode_eg4_modbus_regs(regs: list[int]) -> dict[str, Any]:
"""Decode the 47-reg read-holding-regs response from an LP4V2 BMS.
Emits named HA entities where meaning is known; raw register_NN
passthrough for the rest."""
def decode_eg4_modbus_regs(regs: list[int], expose_raw: bool = False) -> dict[str, Any]:
"""Decode the read-holding-regs response from an LP4V2 BMS.
Emits named HA entities. If `expose_raw` is True, also emits
`register_NN` entities for every position — useful when refining
the register map; defaults to off to keep HA Devices uncluttered."""
out: dict[str, Any] = {}
# always emit raw registers — invaluable for future refinement
for i, v in enumerate(regs):
out[f"register_{i:02d}"] = v
if expose_raw:
for i, v in enumerate(regs):
out[f"register_{i:02d}"] = v
if len(regs) < 47:
return out
@@ -365,13 +368,9 @@ def decode_eg4_modbus_regs(regs: list[int]) -> dict[str, Any]:
out["cycle_count"] = regs[39]
out["battery_mode"] = regs[40]
# BMS firmware version — regs 41 & 42 appear to hold version codes; emit
# the raw u16s alongside a decimal representation for easier HA display
out["bms_version_hi"] = regs[41]
out["bms_version_lo"] = regs[42]
# reg 46 increments ~1.25 Hz on live bus — likely uptime in deciseconds
out["uptime_ds"] = regs[46]
# regs 41-42: u16 version codes — superseded by `firmware_version` ASCII
# decode below; available via expose_raw if needed
# reg 46: ~1.25 Hz uptime counter — noisy, available via expose_raw if needed
# --- block-2 strings (regs 105..123) — fetched on the second Modbus read ---
if len(regs) >= 124:
@@ -698,9 +697,6 @@ _FIELD_META.update({
"cell_count": (None, None, "measurement", "mdi:numeric"),
"remaining_ah": ("Ah", None, "measurement", "mdi:battery-clock"),
"battery_mode": (None, None, None, "mdi:state-machine"),
"bms_version_hi": (None, None, None, "mdi:chip"),
"bms_version_lo": (None, None, None, "mdi:chip"),
"uptime_ds": (None, None, "total_increasing", "mdi:timer-outline"),
"model": (None, None, None, "mdi:battery-outline"),
"firmware_version": (None, None, None, "mdi:chip"),
"firmware_date": (None, None, None, "mdi:calendar"),
@@ -717,6 +713,38 @@ def field_meta(key: str) -> tuple[str | None, str | None, str | None, str | None
return _FIELD_META.get(key, (None, None, None, None))
# HA frontend display precision per field. Drives `suggested_display_precision`
# in the discovery config so the UI shows e.g. "3.285 V" instead of "3 V".
_FIELD_PRECISION: dict[str, int] = {
"pack_voltage": 2,
"pack_current": 2,
"max_current_limit": 2,
"soc": 0,
"soh": 0,
"cycle_count": 0,
"cell_voltage_min": 3,
"cell_voltage_max": 3,
"cell_voltage_delta_mv": 0,
"cell_lowest": 0,
"cell_highest": 0,
"cell_count": 0,
"capacity_ah": 1,
"remaining_ah": 1,
"error_code": 0,
"battery_mode": 0,
}
for _i in range(1, 17):
_FIELD_PRECISION[f"cell_{_i:02d}_voltage"] = 3
for _i in range(1, 7):
_FIELD_PRECISION[f"temperature_{_i}"] = 0
_FIELD_PRECISION["temperature_pcb"] = 0
def field_precision(key: str) -> int | None:
"""How many decimals HA's frontend should render. None = HA default."""
return _FIELD_PRECISION.get(key)
class MQTTPublisher:
def __init__(self, cfg: MQTTConfig, dry_run: bool = False):
self._cfg = cfg
@@ -765,6 +793,8 @@ class MQTTPublisher:
if device_class is not None: cfg["device_class"] = device_class
if state_class is not None: cfg["state_class"] = state_class
if icon is not None: cfg["icon"] = icon
precision = field_precision(key)
if precision is not None: cfg["suggested_display_precision"] = precision
topic = f"{self._cfg.discovery_prefix}/sensor/{pack_name}_{key}/config"
payload = json.dumps(cfg)
if self._dry_run:
@@ -920,7 +950,7 @@ def run_modbus_per_pack(cfg: AppConfig, publisher: MQTTPublisher,
if p.name not in pollers:
raise RuntimeError(f"no poller configured for {p.name}")
regs = pollers[p.name].poll()
readings = decode_eg4_modbus_regs(regs)
readings = decode_eg4_modbus_regs(regs, expose_raw=cfg.expose_raw_registers)
publisher.publish_pack(p.name, readings)
st.response_count += 1
if not st.ok and st.consecutive_errors > 0: