Files
shaggy-solar/LVX6048/tmp/lvx-purge-orphans

80 lines
2.4 KiB
Plaintext
Raw Normal View History

2026-04-26 08:49:05 -04:00
#!/usr/bin/env python3
"""lvx-purge-orphans — remove deprecated HA-discovery entities from MQTT.
After the powermon-patches (g) cleanup + GS excl_filter additions:
- several fields are no longer published (always-zero / dead on this hardware)
- one entity got renamed (dc-ac_power_direction → dc_ac_power_direction)
This tool publishes empty retained payloads to those orphan config topics so
HA forgets the entities cleanly.
Idempotent. Compatible with paho-mqtt 1.x and 2.x.
Usage:
lvx-purge-orphans <broker> <user> <password> [--dry-run]
"""
from __future__ import annotations
import sys
import argparse
import paho.mqtt.client as mqtt
INVERTERS = ["lvx6048_1", "lvx6048_2"]
# Keys that disappeared from publication after this cleanup pass.
DEPRECATED_KEYS: list[str] = [
"battery_voltage_from_scc",
"battery_voltage_from_scc2",
"mppt2_charger_temperature",
"mppt2_charger_status",
"mppt2_input_power",
"mppt2_input_voltage",
"setting_value_configuration_state",
"dc-ac_power_direction", # renamed → dc_ac_power_direction
]
PREFIX = "homeassistant/sensor"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("host")
ap.add_argument("user")
ap.add_argument("password")
ap.add_argument("--dry-run", action="store_true",
help="print topics that would be purged, don't publish")
ap.add_argument("--port", type=int, default=1883)
args = ap.parse_args()
topics = [f"{PREFIX}/{inv}_{key}/config"
for inv in INVERTERS for key in DEPRECATED_KEYS]
print(f"will purge {len(topics)} topic(s) "
f"({len(DEPRECATED_KEYS)} keys × {len(INVERTERS)} inverters)")
if args.dry_run:
for t in topics:
print(f" (dry-run) {t}")
return 0
try:
c = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="lvx-purge")
except AttributeError:
c = mqtt.Client(client_id="lvx-purge")
c.username_pw_set(args.user, args.password)
c.connect(args.host, args.port, keepalive=30)
c.loop_start()
try:
for i, t in enumerate(topics, start=1):
info = c.publish(t, payload="", qos=0, retain=True)
info.wait_for_publish(2)
print(f" {i:>3}/{len(topics)} {t}")
print(f"done — {len(topics)} retained configs cleared")
finally:
c.loop_stop()
c.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())