#!/usr/bin/env python3 """lvx-resolve-links — create /dev/lvx6048-N symlinks keyed to PI18 serial. Globs /dev/hidraw*, sends PI18 `ID` to each, and creates /dev/lvx6048-1 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_1 /dev/lvx6048-2 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_2 Must run as root. Intended as a systemd oneshot before powermon*.service. Runs a single discovery pass with exclusive access — unlike powermon's native resolve_path which each service performs independently at startup, causing collisions when a sibling service is already holding the target hidraw. Edit SERIAL_UNIT_1 / SERIAL_UNIT_2 when a unit is replaced. """ from __future__ import annotations import asyncio import glob import os import re import sys SERIAL_UNIT_1 = "1496142109100037000000" SERIAL_UNIT_2 = "1496142408100255000000" # Real LVX PI18 serials are long ASCII digit strings (the two known units are # 22 digits each). Anything else — null-byte garbage, the literal "Invalid # response …" error wrapper string, an empty payload, etc. — is a stuck-firmware # response that must be classified as a failed probe so we'll retry it. _SERIAL_RE = re.compile(r"^\d{18,24}$") LINK_FOR_SERIAL = { SERIAL_UNIT_1: "/dev/lvx6048-1", SERIAL_UNIT_2: "/dev/lvx6048-2", } sys.path.insert(0, "/home/noise/.local/share/uv/tools/powermon/lib/python3.11/site-packages") from powermon.protocols import get_protocol_definition # noqa: E402 from powermon.ports.usbport import USBPort # noqa: E402 async def probe_serial(path: str) -> str | None: proto = get_protocol_definition(protocol="PI18") port = USBPort(path=path, protocol=proto) port.path = path try: await port.connect() if not port.is_connected(): return None cmd = proto.get_id_command() res = await port.send_and_receive(command=cmd) await port.disconnect() except Exception as e: print(f" probe {path}: {e.__class__.__name__}: {e}", file=sys.stderr) return None if res is None or not getattr(res, "is_valid", False) or not res.readings: return None sn = str(res.readings[0].data_value) # Filter out malformed responses (null-byte garbage, error-wrapper strings) # — caller treats None as "retry this path on the next tick". if not _SERIAL_RE.fullmatch(sn): return None return sn def _relink(link: str, target: str) -> None: target_basename = os.path.basename(target) try: if os.path.islink(link) or os.path.exists(link): os.unlink(link) except FileNotFoundError: pass os.symlink(target_basename, link) async def main() -> int: # Hot-plug case: udev fires this script as soon as one hidraw appears, but # a sibling inverter coming up at nearly the same moment may still be # enumerating. Also covers the stuck-HID-endpoint case: a unit that just # came up may answer the first PI18 ID query with null bytes for a few # seconds before its firmware populates the response. Retry-probe up to # ~20 s with per-path exponential backoff (1 s → 2 → 4 → 8 cap). Backing # off between failed probes — rather than hammering at fixed 0.5 s pacing # — gives a confused HID endpoint time to recover instead of compounding # the confusion. expected = set(LINK_FOR_SERIAL.keys()) loop_time = asyncio.get_event_loop().time deadline = loop_time() + 20.0 sn_to_path: dict[str, str] = {} # Per-path retry state. Paths are removed from `attempts` once they # yield a recognized serial; paths still in it get re-probed when their # `next_attempt_at` falls due. Unknown / not-an-LVX paths stay in the # map and back off to 8 s, so we don't busy-poll dead ports either. attempts: dict[str, int] = {} next_attempt_at: dict[str, float] = {} while True: now = loop_time() resolved_paths = {p for sn, p in sn_to_path.items() if sn in expected} for p in sorted(glob.glob("/dev/hidraw*")): if p in resolved_paths: continue if next_attempt_at.get(p, 0.0) > now: continue sn = await probe_serial(p) n = attempts[p] = attempts.get(p, 0) + 1 if sn: print(f"{p}: serial {sn}") sn_to_path[sn] = p next_attempt_at.pop(p, None) else: backoff = min(2 ** (n - 1), 8) # 1, 2, 4, 8, 8, … next_attempt_at[p] = now + backoff print(f"{p}: no valid PI18 serial (attempt {n}, retry in {backoff}s)") if expected.issubset(sn_to_path): break if loop_time() >= deadline: break await asyncio.sleep(0.5) # Always-best-effort policy: exit 0 even if no expected serials were # found, so a transient inverter blip (e.g. the unit being mid-cold-start # at sunrise) doesn't permanently latch dependent services into a failed # state via systemd `Requires=`. The periodic timer + powermon's own # Restart=always handles convergence once the inverter recovers. changed = False for sn, link in LINK_FOR_SERIAL.items(): if sn in sn_to_path: target = sn_to_path[sn] current = os.readlink(link) if os.path.islink(link) else None current_full = os.path.join(os.path.dirname(link), current) if current else None if current_full != target: _relink(link, target) print(f"symlink {link} -> {os.path.basename(target)}") changed = True else: print(f"symlink {link} -> {os.path.basename(target)} (unchanged)") else: if os.path.islink(link): try: os.unlink(link) changed = True except FileNotFoundError: pass print(f"WARNING: {link} serial {sn} not found on any /dev/hidraw*") # If symlinks actually moved (e.g. hidraw indices flipped post-power-cycle), # bounce powermon so its open hidraw fds re-bind to the right physical unit. # Idempotent runs (no symlink change) leave powermon alone. if changed: print("symlinks changed — restarting powermon services") import subprocess subprocess.Popen( ["/bin/systemctl", "--no-block", "restart", "powermon.service", "powermon2.service"] ) return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))