Files
shaggy-solar/openevse/bin/openevse-publish-discovery

302 lines
14 KiB
Plaintext
Raw Normal View History

2026-05-01 16:56:37 -04:00
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "paho-mqtt>=2.0",
# "pyyaml>=6.0",
# ]
# ///
"""openevse-publish-discovery — wire an already-running OpenEVSE charger into
Home Assistant via retained MQTT-discovery configs.
The OpenEVSE WiFi firmware publishes raw values to `openevse/<key>` (no JSON
wrapping, no HA discovery). HA can't see them unless either (a) someone adds
manual `mqtt: sensor:` blocks in `configuration.yaml` or (b) retained
`homeassistant/sensor/openevse_<key>/config` payloads exist on the broker.
This tool publishes the latter — a one-shot, idempotent.
After running, HA discovers the charger as a single device and exposes the
~22 entities below. Re-run any time to refresh; use --purge to remove all
of them (e.g. before uninstalling).
Discovery prefix and device id are read from the OpenEVSE announce topic
(`openevse/announce/<id>`, retained at connect-time) so we follow whatever
the firmware reports rather than hard-coding. Falls back to CLI flags if
no announce is on the broker (e.g. EVSE was offline when you ran this).
Defaults: broker host/user/pass loaded from
~/.config/powermon/powermon.yaml if present (since powermon already
needs them). Override on the CLI as needed.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
import paho.mqtt.client as mqtt
import yaml
DEFAULT_PREFIX = "openevse"
DEFAULT_HA_PREFIX = "homeassistant"
DEFAULT_BROKER_CONFIG = Path.home() / ".config" / "powermon" / "powermon.yaml"
# (key, friendly name, unit, device_class, state_class, value_template, icon, suggested_display_precision)
#
# value_template is Jinja applied to the *string* the OpenEVSE publishes:
# - openevse/amp → milliamps (e.g. "17380") — convert to A
# - openevse/temp[_max] → tenths-°C (e.g. "208") — convert to °C
# Other fields are already in the unit HA expects.
SENSORS: list[tuple] = [
# --- live charging ----------------------------------------------------
("power", "Power", "W", "power", "measurement", None, "mdi:flash", 0),
("voltage", "Voltage", "V", "voltage", "measurement", None, "mdi:lightning-bolt", 0),
("amp", "Current", "A", "current", "measurement",
"{{ (value | float / 1000) | round(2) }}", "mdi:current-ac", 2),
("pilot", "Pilot Current", "A", "current", "measurement", None, "mdi:current-ac", 0),
("max_current", "Max Current", "A", "current", "measurement", None, "mdi:current-ac", 0),
# --- energy counters --------------------------------------------------
("session_energy","Session Energy", "Wh", "energy", "total_increasing", None, "mdi:battery-charging", 1),
("total_energy", "Total Energy", "Wh", "energy", "total_increasing", None, "mdi:counter", 1),
("total_day", "Energy Today", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
("total_week", "Energy This Week", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
("total_month", "Energy This Month","kWh", "energy", "total_increasing", None, "mdi:counter", 3),
("total_year", "Energy This Year", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
# --- time -------------------------------------------------------------
("session_elapsed","Session Elapsed", "s", "duration", "measurement", None, "mdi:timer-outline", 0),
("uptime", "Uptime", "s", "duration", "measurement", None, "mdi:clock-outline", 0),
# --- temperature ------------------------------------------------------
("temp", "Temperature", "°C", "temperature", "measurement",
"{{ (value | float / 10) | round(1) }}", "mdi:thermometer", 1),
("temp_max", "Max Temperature", "°C", "temperature", "measurement",
"{{ (value | float / 10) | round(1) }}", "mdi:thermometer-alert", 1),
# --- status -----------------------------------------------------------
("status", "Status", None, None, None, None, "mdi:ev-station", None),
("state", "State Code", None, None, "measurement", None, "mdi:counter", 0),
# --- diagnostics ------------------------------------------------------
("srssi", "Wi-Fi RSSI", "dBm", "signal_strength","measurement", None, "mdi:wifi", 0),
("freeram", "Free RAM", "B", "data_size", "measurement", None, "mdi:memory", 0),
("total_switches","Total Switches", None, None, "total_increasing",None, "mdi:counter", 0),
]
# Binary sensors: OpenEVSE publishes "0"/"1" strings.
# (key, friendly name, device_class, icon)
BINARY_SENSORS: list[tuple] = [
("vehicle", "Vehicle Connected", "plug", "mdi:car-electric"),
("evse_connected", "EVSE Connected", "connectivity", "mdi:ev-plug-type1"),
("manual_override", "Manual Override", None, "mdi:hand-back-right"),
]
ENTITY_CATEGORY_DIAG = {
"uptime", "srssi", "freeram", "total_switches", "evse_connected",
}
def _load_broker_defaults(path: Path) -> dict[str, object]:
"""Pick host/port/user/password from powermon.yaml if present."""
if not path.is_file():
return {}
try:
data = yaml.safe_load(path.read_text()) or {}
except yaml.YAMLError:
return {}
broker = data.get("mqttbroker") or {}
out = {}
if isinstance(broker.get("name"), str): out["host"] = broker["name"]
if isinstance(broker.get("port"), int): out["port"] = broker["port"]
if isinstance(broker.get("username"), str): out["user"] = broker["username"]
if isinstance(broker.get("password"), str): out["password"] = broker["password"]
return out
def _make_client(client_id: str) -> mqtt.Client:
try:
return mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
except AttributeError: # paho-mqtt 1.x
return mqtt.Client(client_id=client_id)
def _read_announce(host: str, port: int, user: str, password: str,
prefix: str, timeout: float = 3.0) -> dict | None:
"""Subscribe to the announce wildcard and grab the retained payload, if any."""
seen: dict | None = None
c = _make_client("openevse-announce-reader")
c.username_pw_set(user, password)
def on_message(_client, _ud, msg):
nonlocal seen
try:
seen = json.loads(msg.payload.decode())
seen["_topic"] = msg.topic
except (UnicodeDecodeError, json.JSONDecodeError):
pass
c.on_message = on_message
c.connect(host, port, keepalive=10)
c.subscribe(f"{prefix}/announce/+", qos=0)
c.loop_start()
deadline = time.monotonic() + timeout
while seen is None and time.monotonic() < deadline:
time.sleep(0.05)
c.loop_stop()
c.disconnect()
return seen
def _build_device(announce: dict | None, device_id: str) -> dict:
"""Build the HA `device` block, populated from announce when available."""
name = (announce or {}).get("name") or f"openevse-{device_id}"
http = (announce or {}).get("http")
dev: dict = {
"identifiers": [f"openevse_{device_id}"],
"name": name,
"manufacturer": "OpenEVSE",
"model": "OpenEVSE WiFi (ESP32)",
}
if http:
dev["configuration_url"] = http
return dev
def _common_disco(prefix: str, key: str, device_id: str, dev: dict,
ann_topic: str) -> dict:
return {
"name": None, # set per entity
"state_topic": f"{prefix}/{key}",
"unique_id": f"openevse_{device_id}_{key}",
"object_id": f"openevse_{key}",
"device": dev,
"availability": [{
"topic": ann_topic,
"value_template": '{{ "online" if value_json.state == "connected" else "offline" }}',
"payload_available": "online",
"payload_not_available": "offline",
}],
}
def _sensor_topic(ha_prefix: str, key: str) -> str:
return f"{ha_prefix}/sensor/openevse_{key}/config"
def _binary_topic(ha_prefix: str, key: str) -> str:
return f"{ha_prefix}/binary_sensor/openevse_{key}/config"
def build_payloads(prefix: str, ha_prefix: str, device_id: str,
announce: dict | None) -> list[tuple[str, dict]]:
"""Return [(topic, payload_dict), ...] for every discovery config."""
dev = _build_device(announce, device_id)
ann_topic = (announce or {}).get("_topic") or f"{prefix}/announce/{device_id}"
out: list[tuple[str, dict]] = []
for key, name, unit, dclass, sclass, vt, icon, prec in SENSORS:
cfg = _common_disco(prefix, key, device_id, dev, ann_topic)
cfg["name"] = name
if unit is not None: cfg["unit_of_measurement"] = unit
if dclass is not None: cfg["device_class"] = dclass
if sclass is not None: cfg["state_class"] = sclass
if vt is not None: cfg["value_template"] = vt
if icon is not None: cfg["icon"] = icon
if prec is not None: cfg["suggested_display_precision"] = prec
if key in ENTITY_CATEGORY_DIAG:
cfg["entity_category"] = "diagnostic"
out.append((_sensor_topic(ha_prefix, key), cfg))
for key, name, dclass, icon in BINARY_SENSORS:
cfg = _common_disco(prefix, key, device_id, dev, ann_topic)
cfg["name"] = name
cfg["payload_on"] = "1"
cfg["payload_off"] = "0"
if dclass is not None: cfg["device_class"] = dclass
if icon is not None: cfg["icon"] = icon
if key in ENTITY_CATEGORY_DIAG:
cfg["entity_category"] = "diagnostic"
out.append((_binary_topic(ha_prefix, key), cfg))
return out
def main() -> int:
defaults = _load_broker_defaults(DEFAULT_BROKER_CONFIG)
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
ap.add_argument("--host", default=defaults.get("host"),
help=f"broker host (default: from {DEFAULT_BROKER_CONFIG} → mqttbroker.name)")
ap.add_argument("--port", type=int, default=defaults.get("port", 1883))
ap.add_argument("--user", default=defaults.get("user"))
ap.add_argument("--password", default=defaults.get("password"))
ap.add_argument("--prefix", default=DEFAULT_PREFIX,
help=f"OpenEVSE MQTT topic prefix (default: {DEFAULT_PREFIX})")
ap.add_argument("--ha-prefix", default=DEFAULT_HA_PREFIX,
help=f"HA discovery prefix (default: {DEFAULT_HA_PREFIX})")
ap.add_argument("--device-id",
help="device id; if omitted, read from announce topic")
ap.add_argument("--purge", action="store_true",
help="publish empty retained payloads to clear all entities")
ap.add_argument("--dry-run", action="store_true",
help="print what would be published, don't connect")
args = ap.parse_args()
if not args.host:
ap.error("no --host given and no broker config found")
if args.user is None or args.password is None:
ap.error("--user / --password required (or populate ~/.config/powermon/powermon.yaml)")
announce = None
if not args.dry_run:
announce = _read_announce(args.host, args.port, args.user, args.password,
args.prefix)
device_id = args.device_id
if not device_id and announce:
# announce topic is openevse/announce/<id>
topic = announce.get("_topic", "")
device_id = topic.rsplit("/", 1)[-1] or None
if not device_id:
ap.error("could not determine device id from announce topic; "
"pass --device-id (e.g. last 4 of MAC, like 'a048')")
payloads = build_payloads(args.prefix, args.ha_prefix, device_id, announce)
action = "purge" if args.purge else "publish"
print(f"{action} {len(payloads)} HA discovery config(s) for openevse "
f"device_id={device_id!r}, broker={args.host}:{args.port}")
if announce is not None:
print(f" announce: {announce.get('name', '?')} ({announce.get('http', 'no http')})")
elif not args.dry_run:
print(" (no retained announce found — entities will be marked unavailable "
"until OpenEVSE next reconnects to the broker)")
if args.dry_run:
for topic, body in payloads:
shown = "<purge>" if args.purge else json.dumps(body, separators=(",", ":"))
print(f" {topic}\n {shown}")
return 0
c = _make_client("openevse-publish-discovery")
c.username_pw_set(args.user, args.password)
c.connect(args.host, args.port, keepalive=30)
c.loop_start()
try:
for i, (topic, body) in enumerate(payloads, start=1):
payload = "" if args.purge else json.dumps(body, separators=(",", ":"))
info = c.publish(topic, payload=payload, qos=0, retain=True)
info.wait_for_publish(2)
print(f" {i:>3}/{len(payloads)} {topic}")
finally:
c.loop_stop()
c.disconnect()
print(f"done — {len(payloads)} retained config(s) {'cleared' if args.purge else 'written'}")
return 0
if __name__ == "__main__":
sys.exit(main())