2026-04-24 16:34:10 -04:00
#!/home/noise/.local/share/uv/tools/powermon/bin/python
"""
lvx - flash — apply a YAML settings profile to an LVX6048 via PI18 over USB - HID .
Usage :
. / flash . py dump - - device / dev / lvx6048 - 1 - - out profiles / current . yaml
. / flash . py diff - - device / dev / lvx6048 - 1 - - profile profiles / X . yaml
. / flash . py apply - - device / dev / lvx6048 - 1 - - profile profiles / X . yaml - - confirm
. / flash . py compare - - device - a / dev / lvx6048 - 1 - - device - b / dev / lvx6048 - 2
. / flash . py sync - check - - device - a / dev / lvx6048 - 1 - - device - b / dev / lvx6048 - 2
Safety :
- ` apply ` stops powermon . service for the duration and restarts it afterwards .
- Every set is followed by a PIRI readback that must match , or the run aborts .
- Values are range - checked before any write .
- Settings are applied in a safe order ( cutoff < stop - discharge < float < bulk ) .
"""
from __future__ import annotations
import argparse
import asyncio
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any , Callable
import yaml
from powermon . commands . command import Command
from powermon . commands . result import ResultType
from powermon . protocols import get_protocol_definition
from powermon . ports . usbport import USBPort
PROTOCOL = " PI18 "
UNIT_INDEX = 0 # parallel-stack unit index for PCP / MCHGC / MUCHGC. Master = 0.
# Default serial numbers for this stack. Used when --device / --serial are
# omitted; each command will glob /dev/hidraw* and probe PI18 ID on each to
# find the matching unit, so cable/hub-port moves do not require reconfig.
SERIAL_UNIT_1 = " 1496142109100037000000 "
SERIAL_UNIT_2 = " 1496142408100255000000 "
# -------- profile-key → PI18 encoding + PIRI readback index --------
# PIRI test response has 26 fields; indices below are 0-based.
# Source: powermon/protocols/pi18.py :: QUERY_COMMANDS["PIRI"].reading_definitions
PIRI = {
" battery_voltage " : 7 , # V (rated, read-only)
" stop_discharge_voltage " : 8 , # V (BUCD field 1, a.k.a. "re-charge")
" stop_charge_voltage " : 9 , # V (BUCD field 2, a.k.a. "re-discharge"; 0 = Full)
" cutoff_voltage " : 10 , # V (PSDV)
" bulk_voltage " : 11 , # V (MCHGV field 1)
" float_voltage " : 12 , # V (MCHGV field 2)
" battery_type " : 13 , # enum index (PBT)
" max_utility_charging_current " : 14 , # A (MUCHGC)
" max_charging_current " : 15 , # A (MCHGC)
" output_source_priority " : 17 , # enum index (POP)
" charger_priority " : 18 , # enum index (PCP)
" machine_type " : 20 , # enum index (PEI/PDI: 0=Off Grid, 1=Grid Tie)
" solar_power_priority " : 23 , # enum index (PSP)
}
# Per-key guidance emitted as inline comments by `dump`.
# Keep short — one line each, phrased as constraints.
KEY_DOCS : dict [ str , str ] = {
" battery_type " : " enum: AGM | FLOODED | USER " ,
" cutoff_voltage " : " V 40.0..48.0 — hard shutdown; must be < stop_discharge_voltage. Only honored when battery_type=USER. " ,
" stop_discharge_voltage " : " V 44.0..51.0 — switch battery→grid below this (pair: stop_charge_voltage). Only honored when battery_type=USER. " ,
" stop_charge_voltage " : " V 0 (=Full) or 48.0..58.0 — switch grid→battery above this (pair: stop_discharge_voltage). Only honored when battery_type=USER. " ,
" bulk_voltage " : " V 48.0..58.4 — CC→CV transition (pair: float_voltage; must be >= float_voltage). Only honored when battery_type=USER. " ,
" float_voltage " : " V 48.0..58.4 — held while on grid (pair: bulk_voltage; must be <= bulk_voltage). Only honored when battery_type=USER. " ,
" max_charging_current " : " A 10,20,30,40,50,60,70,80 — combined solar+AC cap " ,
" max_utility_charging_current " : " A 2,10,20,30,40,50,60,70,80 — grid-side cap only " ,
" output_source_priority " : " enum: solar_utility_battery | solar_battery_utility " ,
" charger_priority " : " enum: solar_first | solar_and_utility | solar_only " ,
" solar_power_priority " : " enum: battery_load_utility_ac | load_battery_utility " ,
" grid_tie " : " enum: enabled | disabled (PEI/PDI) " ,
}
POP_MAP = { " solar_utility_battery " : " 0 " , " solar_battery_utility " : " 01 " }
PCP_MAP = { " solar_first " : " 0 " , " solar_and_utility " : " 1 " , " solar_only " : " 2 " }
PSP_MAP = { " battery_load_utility_ac " : " 0 " , " load_battery_utility " : " 1 " }
PBT_MAP = { " AGM " : " 0 " , " FLOODED " : " 1 " , " USER " : " 2 " }
# enum indices in PIRI (for readback verification)
POP_PIRI = { " 0 " : " solar_utility_battery " , " 1 " : " solar_battery_utility " }
PCP_PIRI = { " 0 " : " solar_first " , " 1 " : " solar_and_utility " , " 2 " : " solar_only " }
PSP_PIRI = { " 0 " : " battery_load_utility_ac " , " 1 " : " load_battery_utility " }
PBT_PIRI = { " 0 " : " AGM " , " 1 " : " FLOODED " , " 2 " : " USER " }
MACHINE_PIRI = { " 0 " : " off_grid " , " 1 " : " grid_tie " }
@dataclass
class Setting :
key : str
encoder : Callable [ [ Any ] , str ] # profile value -> PI18 raw command (no prefix/CRC)
decoder : Callable [ [ str ] , Any ] # PIRI raw field -> profile value
piri_field : int
pair_keys : tuple [ str , . . . ] = ( ) # if set, these keys are encoded together in one command
def _v_to_tenths ( v : float | int ) - > str :
return f " { int ( round ( float ( v ) * 10 ) ) : 03d } "
def _tenths_to_v ( raw : str ) - > float :
return int ( raw ) / 10.0
def _amps_to_str ( a : int ) - > str :
return f " { int ( a ) : 03d } "
def _amps_from_str ( raw : str ) - > int :
return int ( raw )
# Applied in this order. Safe: set low-range protections before high-range; currents before priorities.
SCHEDULE : list [ Setting ] = [
Setting ( " battery_type " ,
encoder = lambda v : f " PBT { PBT_MAP [ v ] } " ,
decoder = lambda r : PBT_PIRI . get ( r . lstrip ( " 0 " ) or " 0 " , r ) ,
piri_field = PIRI [ " battery_type " ] ) ,
Setting ( " cutoff_voltage " ,
encoder = lambda v : f " PSDV { _v_to_tenths ( v ) } " ,
decoder = _tenths_to_v ,
piri_field = PIRI [ " cutoff_voltage " ] ) ,
# BUCD sets stop-discharge and stop-charge together
Setting ( " stop_discharge_voltage " ,
encoder = lambda pair : f " BUCD { _v_to_tenths ( pair [ 0 ] ) } , { ' 000 ' if pair [ 1 ] in ( 0 , 0.0 ) else _v_to_tenths ( pair [ 1 ] ) } " ,
decoder = _tenths_to_v ,
piri_field = PIRI [ " stop_discharge_voltage " ] ,
pair_keys = ( " stop_discharge_voltage " , " stop_charge_voltage " ) ) ,
# MCHGV sets bulk and float together
Setting ( " bulk_voltage " ,
encoder = lambda pair : f " MCHGV { _v_to_tenths ( pair [ 0 ] ) } , { _v_to_tenths ( pair [ 1 ] ) } " ,
decoder = _tenths_to_v ,
piri_field = PIRI [ " bulk_voltage " ] ,
pair_keys = ( " bulk_voltage " , " float_voltage " ) ) ,
Setting ( " max_utility_charging_current " ,
encoder = lambda a : f " MUCHGC { UNIT_INDEX } , { _amps_to_str ( a ) } " ,
decoder = _amps_from_str ,
piri_field = PIRI [ " max_utility_charging_current " ] ) ,
Setting ( " max_charging_current " ,
encoder = lambda a : f " MCHGC { UNIT_INDEX } , { _amps_to_str ( a ) } " ,
decoder = _amps_from_str ,
piri_field = PIRI [ " max_charging_current " ] ) ,
Setting ( " output_source_priority " ,
encoder = lambda v : f " POP { POP_MAP [ v ] } " ,
decoder = lambda r : POP_PIRI . get ( r . lstrip ( " 0 " ) or " 0 " , r ) ,
piri_field = PIRI [ " output_source_priority " ] ) ,
Setting ( " charger_priority " ,
encoder = lambda v : f " PCP { UNIT_INDEX } , { PCP_MAP [ v ] } " ,
decoder = lambda r : PCP_PIRI . get ( r . lstrip ( " 0 " ) or " 0 " , r ) ,
piri_field = PIRI [ " charger_priority " ] ) ,
Setting ( " solar_power_priority " ,
encoder = lambda v : f " PSP { PSP_MAP [ v ] } " ,
decoder = lambda r : PSP_PIRI . get ( r . lstrip ( " 0 " ) or " 0 " , r ) ,
piri_field = PIRI [ " solar_power_priority " ] ) ,
Setting ( " grid_tie " ,
encoder = lambda v : " PEI " if v == " enabled " else " PDI " ,
decoder = lambda r : " grid_tie " if r . lstrip ( " 0 " ) == " 1 " else " off_grid " ,
piri_field = PIRI [ " machine_type " ] ) ,
]
# -------- range / consistency validation --------
def _validate ( profile : dict ) - > list [ str ] :
errs : list [ str ] = [ ]
def rng ( key , lo , hi ) :
if key in profile and not ( lo < = profile [ key ] < = hi ) :
errs . append ( f " { key } = { profile [ key ] } out of range [ { lo } , { hi } ] " )
rng ( " cutoff_voltage " , 40.0 , 48.0 )
rng ( " stop_discharge_voltage " , 44.0 , 51.0 )
if " stop_charge_voltage " in profile and profile [ " stop_charge_voltage " ] != 0 :
rng ( " stop_charge_voltage " , 48.0 , 58.0 )
rng ( " bulk_voltage " , 48.0 , 58.4 )
rng ( " float_voltage " , 48.0 , 58.4 )
rng ( " max_charging_current " , 10 , 80 )
rng ( " max_utility_charging_current " , 2 , 80 )
if " cutoff_voltage " in profile and " stop_discharge_voltage " in profile :
if profile [ " cutoff_voltage " ] > = profile [ " stop_discharge_voltage " ] :
errs . append ( " cutoff_voltage must be < stop_discharge_voltage " )
if " float_voltage " in profile and " bulk_voltage " in profile :
if profile [ " float_voltage " ] > profile [ " bulk_voltage " ] :
errs . append ( " float_voltage must be <= bulk_voltage " )
if " max_charging_current " in profile and profile [ " max_charging_current " ] not in ( 10 , 20 , 30 , 40 , 50 , 60 , 70 , 80 ) :
errs . append ( " max_charging_current must be one of 10,20,...,80 " )
if " max_utility_charging_current " in profile and profile [ " max_utility_charging_current " ] not in ( 2 , 10 , 20 , 30 , 40 , 50 , 60 , 70 , 80 ) :
errs . append ( " max_utility_charging_current must be one of 2,10,20,...,80 " )
for k , allowed in [ ( " output_source_priority " , POP_MAP ) , ( " charger_priority " , PCP_MAP ) ,
( " solar_power_priority " , PSP_MAP ) , ( " battery_type " , PBT_MAP ) ] :
if k in profile and profile [ k ] not in allowed :
errs . append ( f " { k } = { profile [ k ] !r} not in { list ( allowed ) } " )
if " grid_tie " in profile and profile [ " grid_tie " ] not in ( " enabled " , " disabled " ) :
errs . append ( " grid_tie must be ' enabled ' or ' disabled ' " )
return errs
# -------- powermon glue --------
async def _open_port ( path : str ) - > USBPort :
protocol = get_protocol_definition ( protocol = PROTOCOL )
port = USBPort ( path = path , protocol = protocol )
port . path = path
await port . connect ( )
if not port . is_connected ( ) :
raise RuntimeError ( f " could not open { path } " )
return port
async def _resolve_path ( device : str | None , serial : str | None ) - > str :
""" Return a concrete device path.
If ` serial ` is given , glob / dev / hidraw * and probe each candidate via PI18 ID
until one matches . Otherwise return ` device ` verbatim ( typically the
resolver - maintained / dev / lvx6048 - N symlink ) . - - serial takes precedence for
cases where the resolver hasn ' t run (e.g. early boot, bare hidraw probing).
"""
import glob as _glob
if serial :
pass # fall through to probe
elif device :
return device
else :
raise RuntimeError ( " must supply --device or --serial " )
candidates = sorted ( _glob . glob ( " /dev/hidraw* " ) )
if not candidates :
raise RuntimeError ( " no /dev/hidraw* devices present " )
for path in candidates :
try :
port = await _open_port ( path )
except Exception :
continue
try :
parts = await _read_raw_parts ( port , " ID " )
# ID returns one field, so parts[0] is the inverter serial
if parts and parts [ 0 ] == str ( serial ) :
return path
except Exception :
pass
finally :
await port . disconnect ( )
raise RuntimeError ( f " no device at /dev/hidraw* responds with serial { serial !r} " )
def _build_command ( code : str , proto ) - > Command :
cd = proto . get_command_definition ( code )
if cd is None :
raise RuntimeError ( f " unknown PI18 command: { code !r} " )
cmd = Command ( code = code , commandtype = " basic " , outputs = [ ] , trigger = None )
cmd . command_definition = cd
cmd . full_command = proto . get_full_command ( code )
return cmd
async def _send ( port : USBPort , code : str ) :
cmd = _build_command ( code , port . protocol )
result = await port . send_and_receive ( cmd )
return result
async def _read_piri ( port : USBPort , retries : int = 3 ) - > dict [ str , str ] :
""" Return { name: raw_string} for each PIRI field.
Retries on transient decode failures — powermon ' s parser can IndexError when
the hidraw fd still holds leftover bytes from a prior command ( e . g . running
multiple queries back - to - back in sync - check ) .
"""
cmd = _build_command ( " PIRI " , port . protocol )
last_err : Any = None
for _ in range ( retries ) :
try :
result = await port . send_and_receive ( cmd )
except ( IndexError , KeyError , ValueError ) as e :
last_err = e
await asyncio . sleep ( 0.3 )
continue
if result is not None and getattr ( result , " is_valid " , True ) :
raw = result . raw_response
if raw . startswith ( b " ^D " ) :
raw = raw [ 5 : ] # ^D + 3-digit length
if raw . endswith ( b " \r " ) :
raw = raw [ : - 3 ] # 2-byte CRC + \r
parts = raw . decode ( " ascii " , errors = " replace " ) . split ( " , " )
return { i : p for i , p in enumerate ( parts ) } | { " _parts " : parts }
last_err = getattr ( result , " error_messages " , None ) or result
await asyncio . sleep ( 0.3 )
raise RuntimeError ( f " PIRI read failed after { retries } attempts: { last_err } " )
def _piri_raw ( piri : dict , idx : int ) - > str :
return piri [ " _parts " ] [ idx ]
async def _wait_ack ( port : USBPort , code : str ) - > bool :
""" Send a setter. Return True on ^1, False on ^0 / unknown. """
cmd = _build_command ( code , port . protocol )
result = await port . send_and_receive ( cmd )
raw = getattr ( result , " raw_response " , b " " ) or b " "
# setter response framed as ^Dlll^1... or similar. Just look for ^1 / ^0 anywhere.
if b " ^1 " in raw :
return True
if b " ^0 " in raw :
return False
# fallback: inspect parsed readings
for r in ( result . readings or [ ] ) :
v = str ( r . data_value ) . lower ( )
if " succeed " in v :
return True
if " fail " in v :
return False
return False
# -------- dump / diff / apply --------
def _dump_profile ( piri_raw : list [ str ] ) - > dict :
""" Convert PIRI raw fields into a profile dict, using the SCHEDULE decoders. """
out : dict [ str , Any ] = { }
# walk SCHEDULE once; pairs already have both halves
for s in SCHEDULE :
if s . key == " grid_tie " :
mt = piri_raw [ PIRI [ " machine_type " ] ]
out [ " grid_tie " ] = " enabled " if mt . lstrip ( " 0 " ) == " 1 " else " disabled "
continue
out [ s . key ] = s . decoder ( piri_raw [ s . piri_field ] )
# fill pair's partner keys too (bulk/float already via bulk_voltage row; pair_keys picks up the second)
# BUCD pair
out [ " stop_discharge_voltage " ] = _tenths_to_v ( piri_raw [ PIRI [ " stop_discharge_voltage " ] ] )
out [ " stop_charge_voltage " ] = _tenths_to_v ( piri_raw [ PIRI [ " stop_charge_voltage " ] ] )
out [ " bulk_voltage " ] = _tenths_to_v ( piri_raw [ PIRI [ " bulk_voltage " ] ] )
out [ " float_voltage " ] = _tenths_to_v ( piri_raw [ PIRI [ " float_voltage " ] ] )
return out
def _diff ( want : dict , have : dict ) - > list [ tuple [ str , Any , Any ] ] :
diffs = [ ]
for k , v in want . items ( ) :
if k not in have :
continue
hv = have [ k ]
if isinstance ( v , float ) or isinstance ( hv , float ) :
if abs ( float ( v ) - float ( hv ) ) > 0.05 :
diffs . append ( ( k , hv , v ) )
else :
if v != hv :
diffs . append ( ( k , hv , v ) )
return diffs
def _systemctl ( action : str ) - > None :
# Stop both units so neither holds the hidraw fd for the one we're writing to.
subprocess . run ( [ " sudo " , " systemctl " , action , " powermon.service " , " powermon2.service " ] , check = True )
async def cmd_dump ( args ) - > int :
path = await _resolve_path ( args . device , args . serial )
port = await _open_port ( path )
try :
piri = await _read_piri ( port )
finally :
await port . disconnect ( )
prof = _dump_profile ( piri [ " _parts " ] )
# Drop any keys whose values aren't round-trippable via their PI18 setter.
errs = _validate ( prof )
skipped : list [ str ] = [ ]
for e in errs :
for k in list ( prof ) :
if e . startswith ( k + " = " ) or e . startswith ( k + " " ) :
if k in prof :
skipped . append ( f " { k } = { prof . pop ( k ) !r} ( { e } ) " )
break
out_path = Path ( args . out )
out_path . parent . mkdir ( parents = True , exist_ok = True )
with out_path . open ( " w " ) as f :
f . write ( " # LVX6048 settings profile. Edit freely; all keys are optional. \n " )
f . write ( " # `flash.py diff` previews changes; `flash.py apply --confirm` writes. \n " )
if skipped :
f . write ( " # \n # skipped (read-only or out of settable range): \n " )
for s in skipped :
f . write ( f " # { s } \n " )
f . write ( " \n " )
for k , v in prof . items ( ) :
doc = KEY_DOCS . get ( k )
if doc :
f . write ( f " # { doc } \n " )
# one-key dump preserves native YAML formatting for scalars
f . write ( yaml . safe_dump ( { k : v } , sort_keys = False , default_flow_style = False ) )
f . write ( " \n " )
print ( f " wrote { out_path } with { len ( prof ) } keys " + ( f " ( { len ( skipped ) } skipped) " if skipped else " " ) )
return 0
async def cmd_diff ( args ) - > int :
with open ( args . profile ) as f :
want = yaml . safe_load ( f ) or { }
errs = _validate ( want )
if errs :
for e in errs :
print ( f " INVALID: { e } " )
return 2
path = await _resolve_path ( args . device , args . serial )
port = await _open_port ( path )
try :
piri = await _read_piri ( port )
finally :
await port . disconnect ( )
have = _dump_profile ( piri [ " _parts " ] )
diffs = _diff ( want , have )
if not diffs :
print ( " no diff — profile matches device " )
return 0
print ( f " { len ( diffs ) } setting(s) would change: " )
for k , hv , wv in diffs :
print ( f " { k } : { hv !r} -> { wv !r} " )
return 0
async def cmd_apply ( args ) - > int :
with open ( args . profile ) as f :
want = yaml . safe_load ( f ) or { }
errs = _validate ( want )
if errs :
for e in errs :
print ( f " INVALID: { e } " )
return 2
if not args . confirm :
print ( " refusing to write without --confirm (use `diff` to preview) " )
return 2
path = await _resolve_path ( args . device , args . serial )
print ( " stopping powermon.service... " )
_systemctl ( " stop " )
try :
port = await _open_port ( path )
try :
piri = await _read_piri ( port )
have = _dump_profile ( piri [ " _parts " ] )
diffs = _diff ( want , have )
if not diffs :
print ( " nothing to do " )
return 0
pending = { k for k , _ , _ in diffs }
applied : list [ str ] = [ ]
for s in SCHEDULE :
# handle pair settings once
if s . pair_keys :
a , b = s . pair_keys
if a not in pending and b not in pending :
continue
if a not in want or b not in want :
print ( f " SKIP { a } / { b } : pair requires both keys in profile " )
continue
code = s . encoder ( ( want [ a ] , want [ b ] ) )
print ( f " -> { code } ( { a } = { want [ a ] } , { b } = { want [ b ] } ) " )
ok = await _wait_ack ( port , code )
if not ok :
print ( f " FAIL: inverter NAK on { code } " )
return 3
# readback
piri2 = await _read_piri ( port )
new_a = _tenths_to_v ( piri2 [ " _parts " ] [ PIRI [ a ] ] )
new_b = _tenths_to_v ( piri2 [ " _parts " ] [ PIRI [ b ] ] )
if abs ( new_a - float ( want [ a ] ) ) > 0.05 or abs ( new_b - float ( want [ b ] ) ) > 0.05 :
print ( f " FAIL: readback mismatch: { a } = { new_a } , { b } = { new_b } " )
return 3
applied + = [ a , b ]
continue
if s . key not in pending :
continue
code = s . encoder ( want [ s . key ] )
print ( f " -> { code } ( { s . key } = { want [ s . key ] } ) " )
ok = await _wait_ack ( port , code )
if not ok :
print ( f " FAIL: inverter NAK on { code } " )
return 3
piri2 = await _read_piri ( port )
actual = s . decoder ( piri2 [ " _parts " ] [ s . piri_field ] )
expected = want [ s . key ]
if isinstance ( actual , float ) :
match = abs ( float ( actual ) - float ( expected ) ) < = 0.05
else :
match = actual == expected or ( s . key == " grid_tie " and actual == ( " grid_tie " if expected == " enabled " else " off_grid " ) )
if not match :
print ( f " FAIL: readback mismatch on { s . key } : got { actual !r} , want { expected !r} " )
return 3
applied . append ( s . key )
print ( f " OK — applied { len ( applied ) } setting(s): { ' , ' . join ( applied ) } " )
return 0
finally :
await port . disconnect ( )
finally :
print ( " restarting powermon.service... " )
_systemctl ( " start " )
# -------- sync-check: are the two units in a valid parallel state? --------
# PI18 MOD codes
MOD_NAMES = {
" 00 " : " Power on " , " 01 " : " Standby " , " 02 " : " Bypass " ,
" 03 " : " Battery " , " 04 " : " Fault " , " 05 " : " Hybrid " ,
}
# PI18 FWS fault-code map (cross-referenced with PI30 QPGS fault codes)
FAULT_NAMES = {
" 00 " : " No fault " ,
" 01 " : " Fan is locked " ,
" 02 " : " Over temperature " ,
" 03 " : " Battery voltage is too high " ,
" 04 " : " Battery voltage is too low " ,
" 05 " : " Output short circuited or Over temperature " ,
" 06 " : " Output voltage is too high " ,
" 07 " : " Over load time out " ,
" 08 " : " Bus voltage is too high " ,
" 09 " : " Bus soft start failed " ,
" 11 " : " Main relay failed " ,
" 51 " : " Over current inverter " ,
" 52 " : " Bus soft start failed " ,
" 53 " : " Inverter soft start failed " ,
" 54 " : " Self-test failed " ,
" 55 " : " Over DC voltage on output of inverter " ,
" 56 " : " Battery connection is open " ,
" 57 " : " Current sensor failed " ,
" 58 " : " Output voltage is too low " ,
" 60 " : " Inverter negative power " ,
" 71 " : " Parallel version different " ,
" 72 " : " Output circuit failed " ,
" 80 " : " CAN communication failed " ,
" 81 " : " Parallel host line lost " ,
" 82 " : " Parallel synchronized signal lost " ,
" 83 " : " Parallel battery voltage detect different " ,
" 84 " : " Parallel Line voltage or frequency detect different " ,
" 85 " : " Parallel Line input current unbalanced " ,
" 86 " : " Parallel output setting different " ,
}
# GS field indices (see powermon/protocols/pi18.py :: QUERY_COMMANDS["GS"])
GS_AC_OUTPUT_V = 2
GS_AC_OUTPUT_HZ = 3
2026-04-26 19:26:44 -04:00
GS_PARALLEL_INDEX = 27 # 0 = master, 1+ = slaves; one unit per cluster reports 0
2026-04-24 16:34:10 -04:00
async def _read_raw_parts ( port : USBPort , code : str , retries : int = 3 ) - > list [ str ] :
cmd = _build_command ( code , port . protocol )
last_err : Any = None
for _ in range ( retries ) :
result = await port . send_and_receive ( cmd )
if result is not None and getattr ( result , " is_valid " , False ) :
raw = result . raw_response
if raw . startswith ( b " ^D " ) :
raw = raw [ 5 : ] # ^D + 3-digit length
if raw . endswith ( b " \r " ) :
raw = raw [ : - 3 ] # 2-byte CRC + \r
return raw . decode ( " ascii " , errors = " replace " ) . split ( " , " )
last_err = getattr ( result , " error_messages " , None ) or result
await asyncio . sleep ( 0.3 )
raise RuntimeError ( f " { code } read failed after { retries } attempts: { last_err } " )
async def _snapshot_sync ( path : str ) - > dict [ str , Any ] :
port = await _open_port ( path )
try :
gs = await _read_raw_parts ( port , " GS " )
fws = await _read_raw_parts ( port , " FWS " )
mod = await _read_raw_parts ( port , " MOD " )
vfw = await _read_raw_parts ( port , " VFW " )
finally :
await port . disconnect ( )
return {
2026-04-26 19:26:44 -04:00
" parallel_index " : int ( gs [ GS_PARALLEL_INDEX ] ) ,
2026-04-24 16:34:10 -04:00
" ac_output_v " : _tenths_to_v ( gs [ GS_AC_OUTPUT_V ] ) ,
" ac_output_hz " : int ( gs [ GS_AC_OUTPUT_HZ ] ) / 10.0 ,
" fault_code " : fws [ 0 ] ,
" fault_name " : FAULT_NAMES . get ( fws [ 0 ] , f " unknown ( { fws [ 0 ] } ) " ) ,
" mode_raw " : mod [ 0 ] ,
" mode " : MOD_NAMES . get ( mod [ 0 ] , mod [ 0 ] ) ,
" main_cpu " : vfw [ 0 ] ,
" slave_cpu " : vfw [ 1 ] ,
}
async def cmd_sync_check ( args ) - > int :
path_a = await _resolve_path ( args . device_a , args . serial_a )
path_b = await _resolve_path ( args . device_b , args . serial_b )
a = await _snapshot_sync ( path_a )
b = await _snapshot_sync ( path_b )
def _row ( label : str , s : dict ) - > str :
2026-04-26 19:26:44 -04:00
idx = s [ " parallel_index " ]
role = f " instance { idx } { ' (master) ' if idx == 0 else ' ' } "
2026-04-24 16:34:10 -04:00
return ( f " { label } : fw= { s [ ' main_cpu ' ] } / { s [ ' slave_cpu ' ] } mode= { s [ ' mode ' ] } "
2026-04-26 19:26:44 -04:00
f " parallel= { role } fault= { s [ ' fault_name ' ] } "
2026-04-24 16:34:10 -04:00
f " vac= { s [ ' ac_output_v ' ] } V fac= { s [ ' ac_output_hz ' ] } Hz " )
print ( _row ( path_a , a ) )
print ( _row ( path_b , b ) )
issues : list [ str ] = [ ]
if a [ " main_cpu " ] != b [ " main_cpu " ] or a [ " slave_cpu " ] != b [ " slave_cpu " ] :
issues . append ( f " firmware mismatch: { a [ ' main_cpu ' ] } / { a [ ' slave_cpu ' ] } vs { b [ ' main_cpu ' ] } / { b [ ' slave_cpu ' ] } — parallel requires matching firmware on both units " )
2026-04-26 19:26:44 -04:00
if a [ " parallel_index " ] == b [ " parallel_index " ] :
issues . append ( f " both units report the same parallel instance index ( { a [ ' parallel_index ' ] } ); should differ — cluster handshake incomplete " )
if 0 not in ( a [ " parallel_index " ] , b [ " parallel_index " ] ) :
issues . append ( f " neither unit reports instance 0 — cluster has no elected master (got { a [ ' parallel_index ' ] } , { b [ ' parallel_index ' ] } ) " )
2026-04-24 16:34:10 -04:00
if a [ " fault_code " ] != " 00 " :
issues . append ( f " { path_a } : active fault { a [ ' fault_code ' ] } ( { a [ ' fault_name ' ] } ) " )
if b [ " fault_code " ] != " 00 " :
issues . append ( f " { path_b } : active fault { b [ ' fault_code ' ] } ( { b [ ' fault_name ' ] } ) " )
if a [ " mode_raw " ] != b [ " mode_raw " ] :
issues . append ( f " mode differs: { a [ ' mode ' ] } vs { b [ ' mode ' ] } " )
if abs ( a [ " ac_output_hz " ] - b [ " ac_output_hz " ] ) > 0.1 and a [ " ac_output_hz " ] > 0 and b [ " ac_output_hz " ] > 0 :
issues . append ( f " AC output frequency diverges: { a [ ' ac_output_hz ' ] } Hz vs { b [ ' ac_output_hz ' ] } Hz (>0.1Hz = not phase-locked) " )
if abs ( a [ " ac_output_v " ] - b [ " ac_output_v " ] ) > 2.0 and a [ " ac_output_v " ] > 0 and b [ " ac_output_v " ] > 0 :
issues . append ( f " AC output voltage diverges: { a [ ' ac_output_v ' ] } V vs { b [ ' ac_output_v ' ] } V (>2V gap) " )
if a [ " ac_output_v " ] == 0 and b [ " ac_output_v " ] == 0 :
issues . append ( " both units AC output = 0 V (idle / not producing); frequency/voltage sync cannot be verified until at least one is inverting " )
print ( )
if not issues :
print ( " SYNC OK " )
return 0
print ( f " { len ( issues ) } issue(s): " )
for i in issues :
print ( f " - { i } " )
return 1
async def cmd_compare ( args ) - > int :
async def _snapshot ( path : str ) - > dict :
port = await _open_port ( path )
try :
piri = await _read_piri ( port )
finally :
await port . disconnect ( )
return _dump_profile ( piri [ " _parts " ] )
path_a = await _resolve_path ( args . device_a , args . serial_a )
path_b = await _resolve_path ( args . device_b , args . serial_b )
a = await _snapshot ( path_a )
b = await _snapshot ( path_b )
shared = sorted ( set ( a ) & set ( b ) )
diffs : list [ tuple [ str , Any , Any ] ] = [ ]
for k in shared :
av , bv = a [ k ] , b [ k ]
if isinstance ( av , float ) or isinstance ( bv , float ) :
if abs ( float ( av ) - float ( bv ) ) > 0.05 :
diffs . append ( ( k , av , bv ) )
elif av != bv :
diffs . append ( ( k , av , bv ) )
a_only = sorted ( set ( a ) - set ( b ) )
b_only = sorted ( set ( b ) - set ( a ) )
if not diffs and not a_only and not b_only :
print ( f " MATCH — { len ( shared ) } settings identical on { path_a } and { path_b } " )
return 0
if diffs :
kw = max ( len ( k ) for k , _ , _ in diffs )
print ( f " { len ( diffs ) } setting(s) differ (of { len ( shared ) } shared): " )
for k , av , bv in diffs :
print ( f " { k : < { kw } } { path_a } = { av !r} { path_b } = { bv !r} " )
if a_only :
print ( f " only on { path_a } : { ' , ' . join ( a_only ) } " )
if b_only :
print ( f " only on { path_b } : { ' , ' . join ( b_only ) } " )
return 1
def main ( ) - > int :
ap = argparse . ArgumentParser ( description = " Flash LVX6048 settings profiles via PI18. " )
sub = ap . add_subparsers ( dest = " cmd " , required = True )
def _add_common ( p ) :
# --device uses the resolver-maintained symlink by default. --serial is
# an explicit fallback that probes /dev/hidraw* via PI18 ID — only use
# when the resolver hasn't run (e.g. early boot / debugging).
p . add_argument ( " --device " , default = " /dev/lvx6048-1 " ,
help = " device path (default: /dev/lvx6048-1 symlink) " )
p . add_argument ( " --serial " , default = None ,
help = " override --device by probing /dev/hidraw* for this PI18 serial " )
d = sub . add_parser ( " dump " , help = " read current settings into a YAML profile " )
_add_common ( d )
d . add_argument ( " --out " , required = True )
df = sub . add_parser ( " diff " , help = " show what would change if a profile were applied " )
_add_common ( df )
df . add_argument ( " --profile " , required = True )
ap_ = sub . add_parser ( " apply " , help = " apply a profile (requires --confirm) " )
_add_common ( ap_ )
ap_ . add_argument ( " --profile " , required = True )
ap_ . add_argument ( " --confirm " , action = " store_true " )
def _add_pair ( p ) :
p . add_argument ( " --device-a " , default = " /dev/lvx6048-1 " , help = " device path for unit A " )
p . add_argument ( " --device-b " , default = " /dev/lvx6048-2 " , help = " device path for unit B " )
p . add_argument ( " --serial-a " , default = None , help = " override --device-a by probing PI18 serial " )
p . add_argument ( " --serial-b " , default = None , help = " override --device-b by probing PI18 serial " )
cp = sub . add_parser ( " compare " , help = " diff live settings between two inverters " )
_add_pair ( cp )
sc = sub . add_parser ( " sync-check " , help = " verify two paralleled inverters are in sync " )
_add_pair ( sc )
args = ap . parse_args ( )
handler = {
" dump " : cmd_dump , " diff " : cmd_diff , " apply " : cmd_apply ,
" compare " : cmd_compare , " sync-check " : cmd_sync_check ,
} [ args . cmd ]
return asyncio . run ( handler ( args ) )
if __name__ == " __main__ " :
sys . exit ( main ( ) )