commissioned
This commit is contained in:
@@ -102,6 +102,7 @@ class AppConfig:
|
||||
packs: list[PackConfig]
|
||||
cell_count: int = 16 # active mode only
|
||||
expose_raw_registers: bool = False # publish register_NN entities (modbus_per_pack)
|
||||
soc_estimator: bool = False # publish independent coulomb-count soc_estimated
|
||||
|
||||
|
||||
def load_config(path: Path) -> AppConfig:
|
||||
@@ -128,6 +129,7 @@ def load_config(path: Path) -> AppConfig:
|
||||
packs=[PackConfig(**p) for p in raw["packs"]],
|
||||
cell_count=raw.get("cell_count", 16),
|
||||
expose_raw_registers=raw.get("expose_raw_registers", False),
|
||||
soc_estimator=raw.get("soc_estimator", False),
|
||||
)
|
||||
|
||||
|
||||
@@ -733,6 +735,9 @@ _FIELD_META.update({
|
||||
"model": (None, None, None, "mdi:battery-outline"),
|
||||
"firmware_version": (None, None, None, "mdi:chip"),
|
||||
"firmware_date": (None, None, None, "mdi:calendar"),
|
||||
# independent coulomb-count SoC estimator (opt-in; see estimate_soc)
|
||||
"soc_estimated": ("%", "battery", "measurement", "mdi:battery-sync"),
|
||||
"soc_est_anchor": (None, None, None, "mdi:anchor"),
|
||||
})
|
||||
def field_meta(key: str) -> tuple[str | None, str | None, str | None, str | None]:
|
||||
if key.startswith("register_"):
|
||||
@@ -759,6 +764,7 @@ _FIELD_PRECISION: dict[str, int] = {
|
||||
"capacity_ah": 1,
|
||||
"remaining_ah": 1,
|
||||
"error_code": 0,
|
||||
"soc_estimated": 1,
|
||||
}
|
||||
for _i in range(1, 17):
|
||||
_FIELD_PRECISION[f"cell_{_i:02d}_voltage"] = 3
|
||||
@@ -863,11 +869,70 @@ class _PackState:
|
||||
consecutive_errors: int = 0
|
||||
response_count: int = 0
|
||||
first_seen_logged: bool = False
|
||||
# independent SoC estimator (opt-in; see estimate_soc)
|
||||
soc_est: float | None = None
|
||||
last_est_ts: float | None = None
|
||||
|
||||
|
||||
_FAIL_HEARTBEAT_CYCLES = 360 # re-log a stuck failure every ~hour at 10 s cadence
|
||||
|
||||
|
||||
# --- independent SoC estimator (coulomb count + voltage anchoring) ----------
|
||||
# The BMS reg21 SoC tracks current well (validated 2026-06-15: reported dSoC
|
||||
# matched the integral of pack_current to <1%/hr across all 6 packs) but only
|
||||
# re-anchors at a full-charge termination the bank rarely reaches while it
|
||||
# cycles mid-range — so the six counters free-run-drift apart (observed
|
||||
# 29-60% at an identical 3.33 V/cell). This estimator integrates the same
|
||||
# trusted current but adds the anchors the BMS lacks: snap to 100% at a
|
||||
# detected full charge, snap to a low ref at the bottom knee. Published as a
|
||||
# separate `soc_estimated` entity; it never replaces the raw `soc`. Pure
|
||||
# function of (readings, prior per-pack state, now) so it can be replayed
|
||||
# offline against captured current logs.
|
||||
SOC_EST_CFG = dict(
|
||||
capacity_ah=100.0, # LP4V2 nameplate (per-pack capacity_ah reg reads 100.0)
|
||||
coulombic_eff=0.99, # charge-acceptance efficiency, applied to + current
|
||||
v_full=3.450, # cell Vmax at/above this ...
|
||||
i_taper=3.0, # ... while charge current has tapered below this (A) => full
|
||||
soc_full=100.0,
|
||||
v_empty=3.000, # cell Vmin at/below this (under load) ...
|
||||
soc_empty=5.0, # ... => bottom anchor
|
||||
max_gap_s=300.0, # ignore integration across gaps longer than this (restarts)
|
||||
)
|
||||
|
||||
|
||||
def estimate_soc(readings: dict[str, Any], st: "_PackState", now: float,
|
||||
cfg: dict = SOC_EST_CFG) -> None:
|
||||
"""Add `soc_estimated` + `soc_est_anchor` to `readings`, coulomb-counting
|
||||
pack_current between cycles and re-anchoring at full / empty. No-op for the
|
||||
cycle if pack_current is missing (can't integrate)."""
|
||||
cur = readings.get("pack_current")
|
||||
if cur is None:
|
||||
return
|
||||
vmax = readings.get("cell_voltage_max")
|
||||
vmin = readings.get("cell_voltage_min")
|
||||
|
||||
if st.soc_est is None: # seed from the BMS on first sight
|
||||
bms = readings.get("soc")
|
||||
st.soc_est = float(bms) if bms is not None else 50.0
|
||||
st.last_est_ts = now
|
||||
|
||||
dt = 0.0 if st.last_est_ts is None else min(now - st.last_est_ts, cfg["max_gap_s"])
|
||||
st.last_est_ts = now
|
||||
if dt > 0: # integrate charge (Ah) -> % of pack
|
||||
eff = cfg["coulombic_eff"] if cur > 0 else 1.0
|
||||
st.soc_est += (cur * eff * dt / 3600.0) / cfg["capacity_ah"] * 100.0
|
||||
|
||||
anchor = "coulomb" # anchors override the free-run integral
|
||||
if vmax is not None and 0.0 <= cur < cfg["i_taper"] and vmax >= cfg["v_full"]:
|
||||
st.soc_est, anchor = cfg["soc_full"], "full"
|
||||
elif vmin is not None and vmin <= cfg["v_empty"]:
|
||||
st.soc_est, anchor = cfg["soc_empty"], "empty"
|
||||
|
||||
st.soc_est = max(0.0, min(100.0, st.soc_est))
|
||||
readings["soc_estimated"] = round(st.soc_est, 1)
|
||||
readings["soc_est_anchor"] = anchor
|
||||
|
||||
|
||||
def _resolve_pack_name(addr: int, packs: list[PackConfig]) -> str:
|
||||
for p in packs:
|
||||
if p.address == addr:
|
||||
@@ -996,6 +1061,8 @@ def run_modbus_per_pack(cfg: AppConfig, publisher: MQTTPublisher,
|
||||
raise RuntimeError(f"no poller configured for {p.name}")
|
||||
regs = pollers[p.name].poll()
|
||||
readings = decode_eg4_modbus_regs(regs, expose_raw=cfg.expose_raw_registers)
|
||||
if cfg.soc_estimator:
|
||||
estimate_soc(readings, st, time.monotonic())
|
||||
publisher.publish_pack(p.name, readings)
|
||||
st.response_count += 1
|
||||
if not st.ok and st.consecutive_errors > 0:
|
||||
|
||||
Reference in New Issue
Block a user