164 lines
6.4 KiB
Python
Executable File
164 lines
6.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""lvx-resolve-links — create /dev/lvx6048-N symlinks keyed to PI18 serial.
|
|
|
|
Globs /dev/hidraw*, sends PI18 `ID` to each, and creates
|
|
/dev/lvx6048-1 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_1
|
|
/dev/lvx6048-2 -> /dev/hidrawX where X's serial matches SERIAL_UNIT_2
|
|
|
|
Must run as root. Intended as a systemd oneshot before powermon*.service.
|
|
|
|
Runs a single discovery pass with exclusive access — unlike powermon's native
|
|
resolve_path which each service performs independently at startup, causing
|
|
collisions when a sibling service is already holding the target hidraw.
|
|
|
|
Edit SERIAL_UNIT_1 / SERIAL_UNIT_2 when a unit is replaced.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import glob
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
SERIAL_UNIT_1 = "1496142109100037000000"
|
|
SERIAL_UNIT_2 = "1496142408100255000000"
|
|
|
|
# Real LVX PI18 serials are long ASCII digit strings (the two known units are
|
|
# 22 digits each). Anything else — null-byte garbage, the literal "Invalid
|
|
# response …" error wrapper string, an empty payload, etc. — is a stuck-firmware
|
|
# response that must be classified as a failed probe so we'll retry it.
|
|
_SERIAL_RE = re.compile(r"^\d{18,24}$")
|
|
|
|
LINK_FOR_SERIAL = {
|
|
SERIAL_UNIT_1: "/dev/lvx6048-1",
|
|
SERIAL_UNIT_2: "/dev/lvx6048-2",
|
|
}
|
|
|
|
sys.path.insert(0, "/home/noise/.local/share/uv/tools/powermon/lib/python3.11/site-packages")
|
|
from powermon.protocols import get_protocol_definition # noqa: E402
|
|
from powermon.ports.usbport import USBPort # noqa: E402
|
|
|
|
|
|
async def probe_serial(path: str) -> str | None:
|
|
proto = get_protocol_definition(protocol="PI18")
|
|
port = USBPort(path=path, protocol=proto)
|
|
port.path = path
|
|
try:
|
|
await port.connect()
|
|
if not port.is_connected():
|
|
return None
|
|
cmd = proto.get_id_command()
|
|
res = await port.send_and_receive(command=cmd)
|
|
await port.disconnect()
|
|
except Exception as e:
|
|
print(f" probe {path}: {e.__class__.__name__}: {e}", file=sys.stderr)
|
|
return None
|
|
if res is None or not getattr(res, "is_valid", False) or not res.readings:
|
|
return None
|
|
sn = str(res.readings[0].data_value)
|
|
# Filter out malformed responses (null-byte garbage, error-wrapper strings)
|
|
# — caller treats None as "retry this path on the next tick".
|
|
if not _SERIAL_RE.fullmatch(sn):
|
|
return None
|
|
return sn
|
|
|
|
|
|
def _relink(link: str, target: str) -> None:
|
|
target_basename = os.path.basename(target)
|
|
try:
|
|
if os.path.islink(link) or os.path.exists(link):
|
|
os.unlink(link)
|
|
except FileNotFoundError:
|
|
pass
|
|
os.symlink(target_basename, link)
|
|
|
|
|
|
async def main() -> int:
|
|
# Hot-plug case: udev fires this script as soon as one hidraw appears, but
|
|
# a sibling inverter coming up at nearly the same moment may still be
|
|
# enumerating. Also covers the stuck-HID-endpoint case: a unit that just
|
|
# came up may answer the first PI18 ID query with null bytes for a few
|
|
# seconds before its firmware populates the response. Retry-probe up to
|
|
# ~20 s with per-path exponential backoff (1 s → 2 → 4 → 8 cap). Backing
|
|
# off between failed probes — rather than hammering at fixed 0.5 s pacing
|
|
# — gives a confused HID endpoint time to recover instead of compounding
|
|
# the confusion.
|
|
expected = set(LINK_FOR_SERIAL.keys())
|
|
loop_time = asyncio.get_event_loop().time
|
|
deadline = loop_time() + 20.0
|
|
sn_to_path: dict[str, str] = {}
|
|
# Per-path retry state. Paths are removed from `attempts` once they
|
|
# yield a recognized serial; paths still in it get re-probed when their
|
|
# `next_attempt_at` falls due. Unknown / not-an-LVX paths stay in the
|
|
# map and back off to 8 s, so we don't busy-poll dead ports either.
|
|
attempts: dict[str, int] = {}
|
|
next_attempt_at: dict[str, float] = {}
|
|
|
|
while True:
|
|
now = loop_time()
|
|
resolved_paths = {p for sn, p in sn_to_path.items() if sn in expected}
|
|
for p in sorted(glob.glob("/dev/hidraw*")):
|
|
if p in resolved_paths:
|
|
continue
|
|
if next_attempt_at.get(p, 0.0) > now:
|
|
continue
|
|
sn = await probe_serial(p)
|
|
n = attempts[p] = attempts.get(p, 0) + 1
|
|
if sn:
|
|
print(f"{p}: serial {sn}")
|
|
sn_to_path[sn] = p
|
|
next_attempt_at.pop(p, None)
|
|
else:
|
|
backoff = min(2 ** (n - 1), 8) # 1, 2, 4, 8, 8, …
|
|
next_attempt_at[p] = now + backoff
|
|
print(f"{p}: no valid PI18 serial (attempt {n}, retry in {backoff}s)")
|
|
if expected.issubset(sn_to_path):
|
|
break
|
|
if loop_time() >= deadline:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Always-best-effort policy: exit 0 even if no expected serials were
|
|
# found, so a transient inverter blip (e.g. the unit being mid-cold-start
|
|
# at sunrise) doesn't permanently latch dependent services into a failed
|
|
# state via systemd `Requires=`. The periodic timer + powermon's own
|
|
# Restart=always handles convergence once the inverter recovers.
|
|
changed = False
|
|
for sn, link in LINK_FOR_SERIAL.items():
|
|
if sn in sn_to_path:
|
|
target = sn_to_path[sn]
|
|
current = os.readlink(link) if os.path.islink(link) else None
|
|
current_full = os.path.join(os.path.dirname(link), current) if current else None
|
|
if current_full != target:
|
|
_relink(link, target)
|
|
print(f"symlink {link} -> {os.path.basename(target)}")
|
|
changed = True
|
|
else:
|
|
print(f"symlink {link} -> {os.path.basename(target)} (unchanged)")
|
|
else:
|
|
if os.path.islink(link):
|
|
try:
|
|
os.unlink(link)
|
|
changed = True
|
|
except FileNotFoundError:
|
|
pass
|
|
print(f"WARNING: {link} serial {sn} not found on any /dev/hidraw*")
|
|
|
|
# If symlinks actually moved (e.g. hidraw indices flipped post-power-cycle),
|
|
# bounce powermon so its open hidraw fds re-bind to the right physical unit.
|
|
# Idempotent runs (no symlink change) leave powermon alone.
|
|
if changed:
|
|
print("symlinks changed — restarting powermon services")
|
|
import subprocess
|
|
subprocess.Popen(
|
|
["/bin/systemctl", "--no-block", "restart",
|
|
"powermon.service", "powermon2.service"]
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(main()))
|