#!/usr/bin/env python3 """lvx-resolve-links — create /dev/lvx6048-N symlinks keyed to PI18 serial. Globs /dev/hidraw*, sends PI18 `ID` to each, and creates /dev/lvx6048-1 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_1 /dev/lvx6048-2 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_2 Must run as root. Intended as a systemd oneshot before powermon*.service. Runs a single discovery pass with exclusive access — unlike powermon's native resolve_path which each service performs independently at startup, causing collisions when a sibling service is already holding the target hidraw. Edit SERIAL_UNIT_1 / SERIAL_UNIT_2 when a unit is replaced. """ from __future__ import annotations import asyncio import glob import os import sys SERIAL_UNIT_1 = "1496142109100037000000" SERIAL_UNIT_2 = "1496142408100255000000" LINK_FOR_SERIAL = { SERIAL_UNIT_1: "/dev/lvx6048-1", SERIAL_UNIT_2: "/dev/lvx6048-2", } sys.path.insert(0, "/home/noise/.local/share/uv/tools/powermon/lib/python3.11/site-packages") from powermon.protocols import get_protocol_definition # noqa: E402 from powermon.ports.usbport import USBPort # noqa: E402 async def probe_serial(path: str) -> str | None: proto = get_protocol_definition(protocol="PI18") port = USBPort(path=path, protocol=proto) port.path = path try: await port.connect() if not port.is_connected(): return None cmd = proto.get_id_command() res = await port.send_and_receive(command=cmd) await port.disconnect() except Exception as e: print(f" probe {path}: {e.__class__.__name__}: {e}", file=sys.stderr) return None if res is None or not getattr(res, "is_valid", False) or not res.readings: return None return str(res.readings[0].data_value) def _relink(link: str, target: str) -> None: target_basename = os.path.basename(target) try: if os.path.islink(link) or os.path.exists(link): os.unlink(link) except FileNotFoundError: pass os.symlink(target_basename, link) async def main() -> int: # Hot-plug case: udev fires this script as soon as one hidraw appears, but # a sibling inverter coming up at nearly the same moment may still be # enumerating. Retry-probe up to ~10 s waiting for all expected serials, # so a transient single-device sighting doesn't leave one symlink missing. expected = set(LINK_FOR_SERIAL.keys()) deadline = asyncio.get_event_loop().time() + 10.0 sn_to_path: dict[str, str] = {} seen_paths: set[str] = set() while True: candidates = sorted(glob.glob("/dev/hidraw*")) for p in candidates: if p in seen_paths: continue seen_paths.add(p) sn = await probe_serial(p) if sn: print(f"{p}: serial {sn}") sn_to_path[sn] = p else: print(f"{p}: no PI18 response (probably not an LVX6048)") if expected.issubset(sn_to_path): break if asyncio.get_event_loop().time() >= deadline: break await asyncio.sleep(0.5) # Always-best-effort policy: exit 0 even if no expected serials were # found, so a transient inverter blip (e.g. the unit being mid-cold-start # at sunrise) doesn't permanently latch dependent services into a failed # state via systemd `Requires=`. The periodic timer + powermon's own # Restart=always handles convergence once the inverter recovers. changed = False for sn, link in LINK_FOR_SERIAL.items(): if sn in sn_to_path: target = sn_to_path[sn] current = os.readlink(link) if os.path.islink(link) else None current_full = os.path.join(os.path.dirname(link), current) if current else None if current_full != target: _relink(link, target) print(f"symlink {link} -> {os.path.basename(target)}") changed = True else: print(f"symlink {link} -> {os.path.basename(target)} (unchanged)") else: if os.path.islink(link): try: os.unlink(link) changed = True except FileNotFoundError: pass print(f"WARNING: {link} serial {sn} not found on any /dev/hidraw*") # If symlinks actually moved (e.g. hidraw indices flipped post-power-cycle), # bounce powermon so its open hidraw fds re-bind to the right physical unit. # Idempotent runs (no symlink change) leave powermon alone. if changed: print("symlinks changed — restarting powermon services") import subprocess subprocess.Popen( ["/bin/systemctl", "--no-block", "restart", "powermon.service", "powermon2.service"] ) return 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))