3xbatt-2xinverter
This commit is contained in:
@@ -73,7 +73,9 @@ install -m 600 "$BASE/config/eg4-battery.yaml.example" \
|
|||||||
|
|
||||||
- **`modbus_per_pack`** (default / recommended). Each pack listed with its own
|
- **`modbus_per_pack`** (default / recommended). Each pack listed with its own
|
||||||
`port:`, `address:` and `baud:` — the daemon opens one serial port per pack
|
`port:`, `address:` and `baud:` — the daemon opens one serial port per pack
|
||||||
and polls each independently.
|
and polls each independently. **Two Modbus reads per cycle per pack**:
|
||||||
|
block 1 (39 regs at 0x0000, live status) + block 2 (91 regs at 0x002D,
|
||||||
|
counters + model + firmware strings). ~150 ms per pack at 9600 baud.
|
||||||
```yaml
|
```yaml
|
||||||
bus:
|
bus:
|
||||||
mode: modbus_per_pack
|
mode: modbus_per_pack
|
||||||
|
|||||||
@@ -23,7 +23,29 @@
|
|||||||
- C++ symbols `BmsDatalog::allFunctionModbusAnalysis`, `BmsMonitoring::allFunctionModbusAnalysis`, `usModbusAskRegBase` etc. → confirms Modbus RTU fn 0x03 read-holding-regs.
|
- C++ symbols `BmsDatalog::allFunctionModbusAnalysis`, `BmsMonitoring::allFunctionModbusAnalysis`, `usModbusAskRegBase` etc. → confirms Modbus RTU fn 0x03 read-holding-regs.
|
||||||
4. **Register map construction.** Correlated live register values (cell voltages, pack V) against the SQL field list to derive the 47-reg map below. High-confidence fields promoted to named HA entities; unknowns still emitted as `register_NN` for future correlation.
|
4. **Register map construction.** Correlated live register values (cell voltages, pack V) against the SQL field list to derive the 47-reg map below. High-confidence fields promoted to named HA entities; unknowns still emitted as `register_NN` for future correlation.
|
||||||
|
|
||||||
## Register map (Modbus fn 0x03, start 0x0000, count 47)
|
## Modbus polling — two reads per cycle
|
||||||
|
|
||||||
|
Reverse-engineered from `lv_host.app`'s `MainWindow::vsSerialSend`: the
|
||||||
|
BMS Tool uses three distinct Modbus query frames (cases in a jump table),
|
||||||
|
two of which our daemon now mirrors:
|
||||||
|
|
||||||
|
| Block | Modbus frame | What it covers |
|
||||||
|
|-------|-------------------------------|-----------------------------------------------------------|
|
||||||
|
| 1 | `fn=03 start=0x0000 count=39` | regs 0-38 — live status (V, I, cells, temps, alarms) |
|
||||||
|
| 2 | `fn=03 start=0x002D count=91` | regs 45-135 — counters at 45-46, model + FW strings at 105-123 |
|
||||||
|
| 0 (skipped) | `fn=03 start=0x0069 count=48` | regs 105-152 — overlaps block 2's range; no new content |
|
||||||
|
|
||||||
|
Block 2 is sparse (regs 47-104 and 124-135 read as zero on a healthy idle
|
||||||
|
pack), but it's where the **model string** and **firmware version+date**
|
||||||
|
live, encoded as ASCII u16 words. We do both reads every cycle; combined
|
||||||
|
they take ~150 ms per pack at 9600 baud.
|
||||||
|
|
||||||
|
There are also two **dynamic** query frames (cases 3 & 4 in the BMS Tool)
|
||||||
|
built at runtime from object state. We haven't analyzed those yet —
|
||||||
|
they're presumably involved in writing config / fetching the unique pack
|
||||||
|
serial number (`BMS_ISDN`). Outstanding work.
|
||||||
|
|
||||||
|
## Register map (block 1 + block 2)
|
||||||
|
|
||||||
From live observation + lv_host.app schema:
|
From live observation + lv_host.app schema:
|
||||||
|
|
||||||
@@ -55,6 +77,12 @@ From live observation + lv_host.app schema:
|
|||||||
| 42 | 0x07ff | BMS_Version (lo) | | | `bms_version_lo` |
|
| 42 | 0x07ff | BMS_Version (lo) | | | `bms_version_lo` |
|
||||||
| 43-45 | — | *?* | | | (register_NN only) |
|
| 43-45 | — | *?* | | | (register_NN only) |
|
||||||
| 46 | +1.25 Hz | runtime counter | × 0.1 s?| | `uptime_ds` |
|
| 46 | +1.25 Hz | runtime counter | × 0.1 s?| | `uptime_ds` |
|
||||||
|
| 47-104| 0 | sparse / unused | | | (raw `register_NN`) |
|
||||||
|
| 105-114| ASCII | model string | | | `model` (e.g. "LFP-51.2V100Ah-V1.0") |
|
||||||
|
| 115-116| 0 | padding | | | (raw) |
|
||||||
|
| 117-119| ASCII | firmware version | | | `firmware_version` ("Z03T21") |
|
||||||
|
| 120-123| ASCII | firmware build date | | | `firmware_date` ("YYYYMMDD") |
|
||||||
|
| 124-135| 0 | unused | | | (raw) |
|
||||||
|
|
||||||
**Confidence levels**: Bold-worthy certain (confirmed by live values + UI labels): pack_voltage, cells 01-16, SOC, SOH, cell_count, capacity_ah. Medium (fits data, unverified): temps, current, bitfields, Battery_Mode. Unknown: regs 32, 35 (probably Error_Code but value always 0 so far), 38-40, 43-45.
|
**Confidence levels**: Bold-worthy certain (confirmed by live values + UI labels): pack_voltage, cells 01-16, SOC, SOH, cell_count, capacity_ah. Medium (fits data, unverified): temps, current, bitfields, Battery_Mode. Unknown: regs 32, 35 (probably Error_Code but value always 0 so far), 38-40, 43-45.
|
||||||
|
|
||||||
|
|||||||
@@ -5,24 +5,28 @@ RS-485 and publishes per-pack telemetry to MQTT with HA auto-discovery.
|
|||||||
|
|
||||||
## Status: live
|
## Status: live
|
||||||
|
|
||||||
As of 2026-04-24, `bat1` is live via `modbus_per_pack` mode on its RS485 port,
|
All 3 packs publishing in `modbus_per_pack` mode, each on its own FTDI
|
||||||
reporting all ~65 entities into HA:
|
RS-485 adapter. Per pack, ~70 named entities + 136 raw `register_NN` series:
|
||||||
|
|
||||||
```
|
```
|
||||||
lifepower4_1_pack_voltage 52.56 V (16 cells × 3.285 V)
|
lifepower4_1_pack_voltage 52.56 V (16 cells × 3.285 V)
|
||||||
lifepower4_1_cell_01_voltage 3.285 V
|
lifepower4_1_cell_01_voltage 3.285 V
|
||||||
lifepower4_1_cell_16_voltage 3.285 V
|
|
||||||
lifepower4_1_cell_voltage_delta_mv 2 (outstanding balance)
|
lifepower4_1_cell_voltage_delta_mv 2 (outstanding balance)
|
||||||
lifepower4_1_soc 100 %
|
lifepower4_1_soc 100 %
|
||||||
lifepower4_1_capacity_ah 100.0 Ah
|
lifepower4_1_capacity_ah 100.0 Ah
|
||||||
lifepower4_1_temperature_01 21 °C
|
|
||||||
lifepower4_1_temperature_pcb 55 °C
|
lifepower4_1_temperature_pcb 55 °C
|
||||||
... plus 14 warning bits, 14 protection bits, all 47 raw registers
|
lifepower4_1_model "LFP-51.2V100Ah-V1.0"
|
||||||
|
lifepower4_1_firmware_version "Z03T21"
|
||||||
|
lifepower4_1_firmware_date "20260206"
|
||||||
|
... plus 14 warning bits, 14 protection bits, all 136 raw registers
|
||||||
```
|
```
|
||||||
|
|
||||||
`bat2` and `bat3` are wired but unpowered — the daemon logs one warning per
|
The decoder maps registers to fields per a layout reverse-engineered from
|
||||||
unreachable pack per startup and keeps retrying silently. They'll come online
|
the EG4 BMS Tool's Mach-O binary (see [`NOTES.md`](./NOTES.md) §"Modbus
|
||||||
automatically when the user powers them up.
|
polling" and §"Register map"). Each cycle, the daemon issues two Modbus
|
||||||
|
fn=0x03 reads per pack — block 1 (39 regs at 0x0000) for live status, and
|
||||||
|
block 2 (91 regs at 0x002D) for counters + model + firmware strings —
|
||||||
|
mirroring what the vendor BMS Tool itself does.
|
||||||
|
|
||||||
## Modes
|
## Modes
|
||||||
|
|
||||||
|
|||||||
@@ -356,17 +356,52 @@ def decode_eg4_modbus_regs(regs: list[int]) -> dict[str, Any]:
|
|||||||
|
|
||||||
# reg 46 increments ~1.25 Hz on live bus — likely uptime in deciseconds
|
# reg 46 increments ~1.25 Hz on live bus — likely uptime in deciseconds
|
||||||
out["uptime_ds"] = regs[46]
|
out["uptime_ds"] = regs[46]
|
||||||
|
|
||||||
|
# --- block-2 strings (regs 105..123) — fetched on the second Modbus read ---
|
||||||
|
if len(regs) >= 124:
|
||||||
|
out["model"] = _ascii_from_regs(regs, 105, 10) # 20 chars
|
||||||
|
out["firmware_version"] = _ascii_from_regs(regs, 117, 3) # 6 chars (e.g. "Z03T21")
|
||||||
|
out["firmware_date"] = _ascii_from_regs(regs, 120, 4) # 8 chars (e.g. "20260206")
|
||||||
|
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
class ModbusActivePoller:
|
def _ascii_from_regs(regs: list[int], start: int, count_regs: int) -> str:
|
||||||
"""One instance per pack. Opens its own serial port, issues a single
|
"""Convert `count_regs` u16 values into an ASCII string (high byte first
|
||||||
read-holding-regs fn=0x03 on every `poll()` call, returns raw registers
|
per Modbus convention). Trailing nulls and non-printable trailing junk
|
||||||
(or raises). Graceful: a pack whose port doesn't exist or whose BMS is
|
are stripped."""
|
||||||
off will raise on poll, and main loop catches + rate-limits the noise."""
|
chars: list[str] = []
|
||||||
|
for r in regs[start:start + count_regs]:
|
||||||
|
for ch in ((r >> 8) & 0xFF, r & 0xFF):
|
||||||
|
if ch == 0:
|
||||||
|
break
|
||||||
|
if 32 <= ch < 127:
|
||||||
|
chars.append(chr(ch))
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
return "".join(chars).rstrip()
|
||||||
|
|
||||||
READ_START = 0x0000
|
|
||||||
READ_COUNT = 47
|
class ModbusActivePoller:
|
||||||
|
"""One instance per pack. Opens its own serial port, issues two
|
||||||
|
read-holding-regs fn=0x03 queries per `poll()` call, and returns a
|
||||||
|
sparse 136-register list (indices 0-38 from the first read, 45-135
|
||||||
|
from the second, gap at 39-44 zero-padded).
|
||||||
|
|
||||||
|
The two reads mirror what lv_host.app's BMS Tool issues in its
|
||||||
|
monitoring loop:
|
||||||
|
block 1 — count=39 @ 0 (live status)
|
||||||
|
block 2 — count=91 @ 0x2d (counters + model + firmware strings)
|
||||||
|
|
||||||
|
Graceful: a pack whose port doesn't exist or whose BMS is off will
|
||||||
|
raise on poll, and the main loop catches + rate-limits the noise."""
|
||||||
|
|
||||||
|
BLOCK_1_START = 0x0000
|
||||||
|
BLOCK_1_COUNT = 39
|
||||||
|
BLOCK_2_START = 0x002D # = 45
|
||||||
|
BLOCK_2_COUNT = 91 # covers regs 45..135
|
||||||
|
TOTAL_REG_COUNT = BLOCK_2_START + BLOCK_2_COUNT # 136
|
||||||
|
|
||||||
def __init__(self, port: str, baud: int, address: int, timeout_s: float = 1.0):
|
def __init__(self, port: str, baud: int, address: int, timeout_s: float = 1.0):
|
||||||
self._port_path = port
|
self._port_path = port
|
||||||
@@ -382,11 +417,9 @@ class ModbusActivePoller:
|
|||||||
bytesize=8, parity="N", stopbits=1,
|
bytesize=8, parity="N", stopbits=1,
|
||||||
)
|
)
|
||||||
|
|
||||||
def poll(self) -> list[int]:
|
def _read_block(self, start: int, count: int) -> list[int]:
|
||||||
self._open()
|
|
||||||
body = bytes([self._address, 0x03,
|
body = bytes([self._address, 0x03,
|
||||||
self.READ_START >> 8, self.READ_START & 0xFF,
|
start >> 8, start & 0xFF, count >> 8, count & 0xFF])
|
||||||
self.READ_COUNT >> 8, self.READ_COUNT & 0xFF])
|
|
||||||
crc = crc16_modbus(body)
|
crc = crc16_modbus(body)
|
||||||
frame = body + bytes([crc & 0xFF, crc >> 8])
|
frame = body + bytes([crc & 0xFF, crc >> 8])
|
||||||
|
|
||||||
@@ -394,7 +427,7 @@ class ModbusActivePoller:
|
|||||||
self._ser.reset_input_buffer()
|
self._ser.reset_input_buffer()
|
||||||
self._ser.write(frame)
|
self._ser.write(frame)
|
||||||
|
|
||||||
expected = 3 + self.READ_COUNT * 2 + 2 # addr + func + bc + data + crc
|
expected = 3 + count * 2 + 2
|
||||||
buf = bytearray()
|
buf = bytearray()
|
||||||
deadline = time.monotonic() + self._timeout_s
|
deadline = time.monotonic() + self._timeout_s
|
||||||
while time.monotonic() < deadline and len(buf) < expected:
|
while time.monotonic() < deadline and len(buf) < expected:
|
||||||
@@ -402,18 +435,33 @@ class ModbusActivePoller:
|
|||||||
if chunk:
|
if chunk:
|
||||||
buf.extend(chunk)
|
buf.extend(chunk)
|
||||||
raw = bytes(buf)
|
raw = bytes(buf)
|
||||||
log.debug("pack 0x%02x tx=%s rx=%s", self._address, frame.hex(" "), raw.hex(" "))
|
log.debug("pack 0x%02x [%d@%d] tx=%s rx=%s",
|
||||||
|
self._address, count, start, frame.hex(" "), raw.hex(" "))
|
||||||
|
|
||||||
if len(raw) < 5 or raw[0] != self._address or raw[1] != 0x03:
|
if len(raw) < 5 or raw[0] != self._address or raw[1] != 0x03:
|
||||||
raise RuntimeError(f"no/bad response ({len(raw)} B)")
|
raise RuntimeError(f"no/bad response ({len(raw)} B) for read({count}@{start})")
|
||||||
bc = raw[2]
|
bc = raw[2]
|
||||||
if len(raw) < 3 + bc + 2:
|
if len(raw) < 3 + bc + 2:
|
||||||
raise RuntimeError(f"truncated response ({len(raw)} B, expected {3 + bc + 2})")
|
raise RuntimeError(f"truncated response for read({count}@{start})")
|
||||||
if not _crc_ok(raw, 0, 3 + bc + 2):
|
if not _crc_ok(raw, 0, 3 + bc + 2):
|
||||||
raise RuntimeError("CRC mismatch")
|
raise RuntimeError(f"CRC mismatch for read({count}@{start})")
|
||||||
data = raw[3:3 + bc]
|
data = raw[3:3 + bc]
|
||||||
return [(data[i] << 8) | data[i + 1] for i in range(0, len(data), 2)]
|
return [(data[i] << 8) | data[i + 1] for i in range(0, len(data), 2)]
|
||||||
|
|
||||||
|
def poll(self) -> list[int]:
|
||||||
|
self._open()
|
||||||
|
# Build a sparse 136-element register array.
|
||||||
|
regs = [0] * self.TOTAL_REG_COUNT
|
||||||
|
block1 = self._read_block(self.BLOCK_1_START, self.BLOCK_1_COUNT)
|
||||||
|
for i, v in enumerate(block1):
|
||||||
|
regs[self.BLOCK_1_START + i] = v
|
||||||
|
# Brief gap between queries — RS-485 silence + give BMS a moment
|
||||||
|
time.sleep(0.05)
|
||||||
|
block2 = self._read_block(self.BLOCK_2_START, self.BLOCK_2_COUNT)
|
||||||
|
for i, v in enumerate(block2):
|
||||||
|
regs[self.BLOCK_2_START + i] = v
|
||||||
|
return regs
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
if self._ser is not None and self._ser.is_open:
|
if self._ser is not None and self._ser.is_open:
|
||||||
self._ser.close()
|
self._ser.close()
|
||||||
@@ -637,6 +685,9 @@ _FIELD_META.update({
|
|||||||
"bms_version_hi": (None, None, None, "mdi:chip"),
|
"bms_version_hi": (None, None, None, "mdi:chip"),
|
||||||
"bms_version_lo": (None, None, None, "mdi:chip"),
|
"bms_version_lo": (None, None, None, "mdi:chip"),
|
||||||
"uptime_ds": (None, None, "total_increasing", "mdi:timer-outline"),
|
"uptime_ds": (None, None, "total_increasing", "mdi:timer-outline"),
|
||||||
|
"model": (None, None, None, "mdi:battery-outline"),
|
||||||
|
"firmware_version": (None, None, None, "mdi:chip"),
|
||||||
|
"firmware_date": (None, None, None, "mdi:calendar"),
|
||||||
})
|
})
|
||||||
for _name in _WARNING_BITS:
|
for _name in _WARNING_BITS:
|
||||||
_FIELD_META[f"warning_{_name}"] = (None, None, None, "mdi:alert")
|
_FIELD_META[f"warning_{_name}"] = (None, None, None, "mdi:alert")
|
||||||
@@ -882,13 +933,14 @@ def run_modbus_per_pack(cfg: AppConfig, publisher: MQTTPublisher,
|
|||||||
|
|
||||||
|
|
||||||
def _mock_modbus_regs(address: int, tick: int) -> list[int]:
|
def _mock_modbus_regs(address: int, tick: int) -> list[int]:
|
||||||
"""Synthesise 47 realistic-looking registers for dry-run mode."""
|
"""Synthesise 136 realistic-looking registers for dry-run mode (covers
|
||||||
|
both block 1 [0..38] and block 2 [45..135] reads of the live daemon)."""
|
||||||
rng = random.Random(address * 1000 + tick)
|
rng = random.Random(address * 1000 + tick)
|
||||||
base_mv = 3280 + rng.randint(-3, 3)
|
base_mv = 3280 + rng.randint(-3, 3)
|
||||||
cells_mv = [base_mv + rng.randint(-8, 8) for _ in range(16)]
|
cells_mv = [base_mv + rng.randint(-8, 8) for _ in range(16)]
|
||||||
regs: list[int] = [0] * 47
|
regs: list[int] = [0] * 136
|
||||||
regs[0] = sum(cells_mv) // 10 # pack voltage × 100
|
regs[0] = sum(cells_mv) // 10 # pack voltage × 100
|
||||||
regs[1] = (30000 - rng.randint(-500, 2000)) & 0xFFFF # current (×100 biased)
|
regs[1] = (30000 - rng.randint(-500, 2000)) & 0xFFFF
|
||||||
for i, mv in enumerate(cells_mv, start=2):
|
for i, mv in enumerate(cells_mv, start=2):
|
||||||
regs[i] = mv
|
regs[i] = mv
|
||||||
regs[18] = 21 + rng.randint(-1, 1)
|
regs[18] = 21 + rng.randint(-1, 1)
|
||||||
@@ -907,6 +959,17 @@ def _mock_modbus_regs(address: int, tick: int) -> list[int]:
|
|||||||
regs[36] = 16 # cell count
|
regs[36] = 16 # cell count
|
||||||
regs[37] = 1000 # 100.0 Ah
|
regs[37] = 1000 # 100.0 Ah
|
||||||
regs[46] = (tick * 5) & 0xFFFF # runtime counter
|
regs[46] = (tick * 5) & 0xFFFF # runtime counter
|
||||||
|
|
||||||
|
# block-2 strings, packed high-byte-first per Modbus convention
|
||||||
|
def _pack_str(target: list[int], offset: int, s: str) -> None:
|
||||||
|
for i in range(0, len(s), 2):
|
||||||
|
hi = ord(s[i])
|
||||||
|
lo = ord(s[i + 1]) if i + 1 < len(s) else 0
|
||||||
|
target[offset + i // 2] = (hi << 8) | lo
|
||||||
|
|
||||||
|
_pack_str(regs, 105, "LFP-51.2V100Ah-V1.0")
|
||||||
|
_pack_str(regs, 117, "Z03T21")
|
||||||
|
_pack_str(regs, 120, "20260206")
|
||||||
return regs
|
return regs
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -26,10 +26,11 @@ but the short version:
|
|||||||
- **Tier 2 — keep short**: all 14 `warning_*` + 14 `protection_*`,
|
- **Tier 2 — keep short**: all 14 `warning_*` + 14 `protection_*`,
|
||||||
`error_code`, `remaining_ah`, `heater`, the derived `temperature_max`
|
`error_code`, `remaining_ah`, `heater`, the derived `temperature_max`
|
||||||
and `pack_power`.
|
and `pack_power`.
|
||||||
- **Tier 3 — exclude** (the `recorder.yaml` here does this): all 47 raw
|
- **Tier 3 — exclude** (the `recorder.yaml` here does this): all 136 raw
|
||||||
`register_NN` entities, the 16 individual `cell_NN_voltage` series,
|
`register_NN` entities per pack, the 16 individual `cell_NN_voltage`
|
||||||
static metadata (`bms_version_*`, `battery_mode`, `cell_count`, etc.),
|
series, static metadata (`bms_version_*`, `model`, `firmware_version`,
|
||||||
and the `uptime_ds` counter that increments every second.
|
`firmware_date`, `battery_mode`, `cell_count`, etc.), and the `uptime_ds`
|
||||||
|
counter that increments every second.
|
||||||
|
|
||||||
## Enabling in HA
|
## Enabling in HA
|
||||||
|
|
||||||
|
|||||||
@@ -114,3 +114,9 @@ views:
|
|||||||
# name: Cell Δ
|
# name: Cell Δ
|
||||||
# - entity: sensor.lifepower4_1_cycle_count
|
# - entity: sensor.lifepower4_1_cycle_count
|
||||||
# name: Cycles
|
# name: Cycles
|
||||||
|
# - type: section
|
||||||
|
# label: Identity
|
||||||
|
# - entity: sensor.lifepower4_1_model
|
||||||
|
# name: Model
|
||||||
|
# - entity: sensor.lifepower4_1_firmware_version
|
||||||
|
# name: Firmware
|
||||||
|
|||||||
@@ -46,6 +46,9 @@ exclude:
|
|||||||
- sensor.lifepower4_*_cell_lowest
|
- sensor.lifepower4_*_cell_lowest
|
||||||
- sensor.lifepower4_*_battery_mode
|
- sensor.lifepower4_*_battery_mode
|
||||||
- sensor.lifepower4_*_max_current_limit
|
- sensor.lifepower4_*_max_current_limit
|
||||||
|
- sensor.lifepower4_*_model # ASCII string from regs 105-114
|
||||||
|
- sensor.lifepower4_*_firmware_version # ASCII string from regs 117-119
|
||||||
|
- sensor.lifepower4_*_firmware_date # ASCII string from regs 120-123
|
||||||
|
|
||||||
# uptime counter — increments every second, kills the recorder's write cache
|
# uptime counter — increments every second, kills the recorder's write cache
|
||||||
- sensor.lifepower4_*_uptime_ds
|
- sensor.lifepower4_*_uptime_ds
|
||||||
|
|||||||
281
eg4battery/tmp/lv-disasm
Executable file
281
eg4battery/tmp/lv-disasm
Executable file
@@ -0,0 +1,281 @@
|
|||||||
|
#!/usr/bin/env -S uv run --script
|
||||||
|
# /// script
|
||||||
|
# requires-python = ">=3.11"
|
||||||
|
# dependencies = [
|
||||||
|
# "lief>=0.14",
|
||||||
|
# "capstone>=5.0",
|
||||||
|
# ]
|
||||||
|
# ///
|
||||||
|
"""
|
||||||
|
lv-disasm — refine the EG4 LP4V2 register map by static analysis of
|
||||||
|
lv_host.app's Mach-O binary.
|
||||||
|
|
||||||
|
Three layers, runnable separately or all at once:
|
||||||
|
--l1 load-offset histogram from BmsMonitoring::allFunctionModbusAnalysis
|
||||||
|
--l2 __DATA scan for usModbusReg* / register-base constants
|
||||||
|
--l3 Qt widget map (lb_cell_N, etc.) cross-referenced with parser code
|
||||||
|
|
||||||
|
--all run all three (default)
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from collections import Counter, defaultdict
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import lief
|
||||||
|
import capstone
|
||||||
|
|
||||||
|
BIN = Path("/home/noise/solar/eg4battery/tmp/bms-tool-ref/lv_host.app/Contents/MacOS/lv_host")
|
||||||
|
|
||||||
|
|
||||||
|
def load_binary(path: Path) -> lief.MachO.Binary:
|
||||||
|
fat = lief.MachO.parse(str(path))
|
||||||
|
if fat is None:
|
||||||
|
sys.exit(f"could not parse {path}")
|
||||||
|
if hasattr(fat, "at"):
|
||||||
|
return fat.at(0) # FatBinary
|
||||||
|
return fat # already a single Binary
|
||||||
|
|
||||||
|
|
||||||
|
def find_func(binary: lief.MachO.Binary, *needles: str) -> list[lief.MachO.Symbol]:
|
||||||
|
"""Return symbols whose name contains any of the given substrings."""
|
||||||
|
out: list[lief.MachO.Symbol] = []
|
||||||
|
for sym in binary.symbols:
|
||||||
|
if not sym.name:
|
||||||
|
continue
|
||||||
|
for n in needles:
|
||||||
|
if n in sym.name:
|
||||||
|
out.append(sym)
|
||||||
|
break
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def func_bytes(binary: lief.MachO.Binary, sym: lief.MachO.Symbol) -> tuple[int, bytes] | None:
|
||||||
|
"""Return (vaddr, code_bytes) for a function symbol. Walks until the next
|
||||||
|
symbol on the same section or end of section."""
|
||||||
|
addr = sym.value
|
||||||
|
text_section = None
|
||||||
|
for s in binary.sections:
|
||||||
|
if s.virtual_address <= addr < s.virtual_address + s.size:
|
||||||
|
text_section = s
|
||||||
|
break
|
||||||
|
if text_section is None:
|
||||||
|
return None
|
||||||
|
# walk symbols sorted by addr in same section, find next one after this
|
||||||
|
others = sorted(
|
||||||
|
(s for s in binary.symbols
|
||||||
|
if s.value > addr and text_section.virtual_address <= s.value < text_section.virtual_address + text_section.size
|
||||||
|
and s.value != addr),
|
||||||
|
key=lambda s: s.value,
|
||||||
|
)
|
||||||
|
end = others[0].value if others else text_section.virtual_address + text_section.size
|
||||||
|
file_off = addr - text_section.virtual_address + text_section.offset
|
||||||
|
return addr, bytes(binary.get_content_from_virtual_address(addr, end - addr))
|
||||||
|
|
||||||
|
|
||||||
|
def disasm_x86_64(addr: int, code: bytes) -> list[tuple[int, str, str]]:
|
||||||
|
md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
|
||||||
|
md.detail = True
|
||||||
|
return [(i.address, i.mnemonic, i.op_str) for i in md.disasm(code, addr)]
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# === Layer 1: load-offset histogram =========================================
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
# Capture u16-load patterns. We care about every read of a 2-byte word from
|
||||||
|
# a memory operand with displacement — especially `movzx eax, word ptr [reg + N]`.
|
||||||
|
_LOAD_PATTERNS = [
|
||||||
|
# mov ax, word ptr [reg + imm] or movzx eax, word ptr [reg + imm]
|
||||||
|
re.compile(r"word ptr \[\w+ ([+\-] 0x[0-9a-fA-F]+|[+\-] \d+)\]"),
|
||||||
|
# mov ax, word ptr [reg + reg2 + imm]
|
||||||
|
re.compile(r"word ptr \[\w+ \+ \w+ ([+\-] 0x[0-9a-fA-F]+|[+\-] \d+)\]"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def layer1(binary: lief.MachO.Binary) -> None:
|
||||||
|
print("=" * 72)
|
||||||
|
print("LAYER 1 — load-offset histogram in *ModbusAnalysis* functions")
|
||||||
|
print("=" * 72)
|
||||||
|
|
||||||
|
targets = find_func(binary, "ModbusAnalysis", "allFunctionModbus")
|
||||||
|
if not targets:
|
||||||
|
print(" no Modbus-analysis symbols found")
|
||||||
|
return
|
||||||
|
|
||||||
|
for sym in targets:
|
||||||
|
result = func_bytes(binary, sym)
|
||||||
|
if result is None:
|
||||||
|
print(f"\n {sym.name}: no code bytes")
|
||||||
|
continue
|
||||||
|
addr, code = result
|
||||||
|
print(f"\n function: {sym.name}")
|
||||||
|
print(f" vaddr: 0x{addr:08x}")
|
||||||
|
print(f" bytes: {len(code)}")
|
||||||
|
if not code:
|
||||||
|
continue
|
||||||
|
instrs = disasm_x86_64(addr, code)
|
||||||
|
if not instrs:
|
||||||
|
print(" (capstone returned no instructions)")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Count word-pointer loads — these read register values from the response buffer
|
||||||
|
offsets: Counter[int] = Counter()
|
||||||
|
for ia, mnem, ops in instrs:
|
||||||
|
if "word ptr" not in ops:
|
||||||
|
continue
|
||||||
|
# extract displacement
|
||||||
|
m = re.search(r"\[[^\]]+([+\-]) (?:0x([0-9a-fA-F]+)|(\d+))\]", ops)
|
||||||
|
if m:
|
||||||
|
sign = -1 if m.group(1) == "-" else 1
|
||||||
|
imm = int(m.group(2), 16) if m.group(2) else int(m.group(3))
|
||||||
|
offsets[sign * imm] += 1
|
||||||
|
print(f" instructions: {len(instrs)}")
|
||||||
|
print(f" distinct word-ptr offsets observed: {len(offsets)}")
|
||||||
|
if offsets:
|
||||||
|
# print all positive offsets sorted
|
||||||
|
print(" offset hex reg# (offset/2 if data starts at 0) count")
|
||||||
|
for off, cnt in sorted(offsets.items()):
|
||||||
|
if off < 0 or off > 200:
|
||||||
|
continue
|
||||||
|
reg_hint = f"reg {off // 2}" if off % 2 == 0 else "(odd offset)"
|
||||||
|
print(f" {off:>6} 0x{off:04x} {reg_hint:<35} {cnt}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# === Layer 2: data-section scan for register-base constants =================
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def layer2(binary: lief.MachO.Binary) -> None:
|
||||||
|
print()
|
||||||
|
print("=" * 72)
|
||||||
|
print("LAYER 2 — globals: usModbusReg* and friends")
|
||||||
|
print("=" * 72)
|
||||||
|
|
||||||
|
needles = ["usModbus", "RegBase", "regBase"]
|
||||||
|
syms = find_func(binary, *needles)
|
||||||
|
if not syms:
|
||||||
|
print(" no Modbus register-base symbols found")
|
||||||
|
return
|
||||||
|
|
||||||
|
for sym in syms:
|
||||||
|
addr = sym.value
|
||||||
|
# try to read up to 64 bytes (32 u16 entries) starting at this symbol
|
||||||
|
try:
|
||||||
|
data = bytes(binary.get_content_from_virtual_address(addr, 64))
|
||||||
|
except Exception as e:
|
||||||
|
print(f" {sym.name} @ 0x{addr:x}: cannot read ({e})")
|
||||||
|
continue
|
||||||
|
# interpret as u16 little-endian
|
||||||
|
u16s = [int.from_bytes(data[i:i + 2], "little") for i in range(0, len(data) - 1, 2)]
|
||||||
|
# find the trailing run of zeros and trim
|
||||||
|
while u16s and u16s[-1] == 0:
|
||||||
|
u16s.pop()
|
||||||
|
print(f"\n {sym.name} @ 0x{addr:08x}")
|
||||||
|
print(f" size : {len(data)} bytes")
|
||||||
|
print(f" u16[]: {[f'0x{v:04x}' for v in u16s[:24]]}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# === Layer 3: Qt widget names + cross-reference =============================
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def layer3(binary: lief.MachO.Binary) -> None:
|
||||||
|
print()
|
||||||
|
print("=" * 72)
|
||||||
|
print("LAYER 3 — Qt widget names + cross-reference with parser code")
|
||||||
|
print("=" * 72)
|
||||||
|
|
||||||
|
# Pull the binary's full string table (cstrings section)
|
||||||
|
text_strs: list[str] = []
|
||||||
|
cstrings = next((s for s in binary.sections if s.name == "__cstring"), None)
|
||||||
|
if cstrings is None:
|
||||||
|
print(" no __cstring section?")
|
||||||
|
return
|
||||||
|
raw = bytes(cstrings.content)
|
||||||
|
# extract null-terminated ASCII strings
|
||||||
|
cur = bytearray()
|
||||||
|
for b in raw:
|
||||||
|
if 32 <= b < 127:
|
||||||
|
cur.append(b)
|
||||||
|
else:
|
||||||
|
if len(cur) >= 4:
|
||||||
|
text_strs.append(cur.decode("ascii"))
|
||||||
|
cur = bytearray()
|
||||||
|
if cur:
|
||||||
|
if len(cur) >= 4:
|
||||||
|
text_strs.append(cur.decode("ascii"))
|
||||||
|
|
||||||
|
# Identify candidate widget object names — Qt's setObjectName values typically
|
||||||
|
# follow snake_case_with_index patterns
|
||||||
|
widgets: dict[str, list[str]] = defaultdict(list)
|
||||||
|
pattern_groups = {
|
||||||
|
"cell_N": re.compile(r"^lb_cell_\d+$|^cell_\d+$"),
|
||||||
|
"warning_N": re.compile(r"^warning_\d+$"),
|
||||||
|
"protection_N": re.compile(r"^protection_\d+$"),
|
||||||
|
"error_N": re.compile(r"^error_\d+$"),
|
||||||
|
"temp_N": re.compile(r"^temp_\d+$|^Temp0\d+$"),
|
||||||
|
"named_field_label": re.compile(
|
||||||
|
r"^(model|com_state|serial_num|ver|cell_num|capacity|"
|
||||||
|
r"voltage|current|temperature|soc|soh|cycle_count)$",
|
||||||
|
re.I,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
for s in text_strs:
|
||||||
|
for kind, rx in pattern_groups.items():
|
||||||
|
if rx.match(s):
|
||||||
|
widgets[kind].append(s)
|
||||||
|
break
|
||||||
|
|
||||||
|
for kind, names in widgets.items():
|
||||||
|
print(f"\n {kind:<22} ({len(names)} found):")
|
||||||
|
for n in sorted(set(names)):
|
||||||
|
print(f" {n}")
|
||||||
|
|
||||||
|
# also list all object names that look like Qt widget identifiers
|
||||||
|
qt_widget = re.compile(r"^(lb_|cb_|le_|pb_|sb_|btn_|gridLayout|horizontalLayout|verticalLayout)")
|
||||||
|
qt_names = sorted({s for s in text_strs if qt_widget.match(s)})
|
||||||
|
print(f"\n Other Qt-widget-like names ({len(qt_names)}):")
|
||||||
|
for n in qt_names[:30]:
|
||||||
|
print(f" {n}")
|
||||||
|
if len(qt_names) > 30:
|
||||||
|
print(f" ... and {len(qt_names) - 30} more")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# === main ===================================================================
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--bin", default=str(BIN), help="path to lv_host Mach-O")
|
||||||
|
ap.add_argument("--l1", action="store_true")
|
||||||
|
ap.add_argument("--l2", action="store_true")
|
||||||
|
ap.add_argument("--l3", action="store_true")
|
||||||
|
ap.add_argument("--all", action="store_true")
|
||||||
|
args = ap.parse_args()
|
||||||
|
if not (args.l1 or args.l2 or args.l3 or args.all):
|
||||||
|
args.all = True
|
||||||
|
|
||||||
|
print(f"Binary: {args.bin}")
|
||||||
|
binary = load_binary(Path(args.bin))
|
||||||
|
print(f"Loaded: {len(binary.symbols)} symbols, {len(binary.sections)} sections")
|
||||||
|
|
||||||
|
if args.l1 or args.all:
|
||||||
|
layer1(binary)
|
||||||
|
if args.l2 or args.all:
|
||||||
|
layer2(binary)
|
||||||
|
if args.l3 or args.all:
|
||||||
|
layer3(binary)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Reference in New Issue
Block a user