302 lines
14 KiB
Plaintext
Executable File
302 lines
14 KiB
Plaintext
Executable File
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.11"
|
|
# dependencies = [
|
|
# "paho-mqtt>=2.0",
|
|
# "pyyaml>=6.0",
|
|
# ]
|
|
# ///
|
|
"""openevse-publish-discovery — wire an already-running OpenEVSE charger into
|
|
Home Assistant via retained MQTT-discovery configs.
|
|
|
|
The OpenEVSE WiFi firmware publishes raw values to `openevse/<key>` (no JSON
|
|
wrapping, no HA discovery). HA can't see them unless either (a) someone adds
|
|
manual `mqtt: sensor:` blocks in `configuration.yaml` or (b) retained
|
|
`homeassistant/sensor/openevse_<key>/config` payloads exist on the broker.
|
|
This tool publishes the latter — a one-shot, idempotent.
|
|
|
|
After running, HA discovers the charger as a single device and exposes the
|
|
~22 entities below. Re-run any time to refresh; use --purge to remove all
|
|
of them (e.g. before uninstalling).
|
|
|
|
Discovery prefix and device id are read from the OpenEVSE announce topic
|
|
(`openevse/announce/<id>`, retained at connect-time) so we follow whatever
|
|
the firmware reports rather than hard-coding. Falls back to CLI flags if
|
|
no announce is on the broker (e.g. EVSE was offline when you ran this).
|
|
|
|
Defaults: broker host/user/pass loaded from
|
|
~/.config/powermon/powermon.yaml if present (since powermon already
|
|
needs them). Override on the CLI as needed.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import paho.mqtt.client as mqtt
|
|
import yaml
|
|
|
|
DEFAULT_PREFIX = "openevse"
|
|
DEFAULT_HA_PREFIX = "homeassistant"
|
|
DEFAULT_BROKER_CONFIG = Path.home() / ".config" / "powermon" / "powermon.yaml"
|
|
|
|
# (key, friendly name, unit, device_class, state_class, value_template, icon, suggested_display_precision)
|
|
#
|
|
# value_template is Jinja applied to the *string* the OpenEVSE publishes:
|
|
# - openevse/amp → milliamps (e.g. "17380") — convert to A
|
|
# - openevse/temp[_max] → tenths-°C (e.g. "208") — convert to °C
|
|
# Other fields are already in the unit HA expects.
|
|
SENSORS: list[tuple] = [
|
|
# --- live charging ----------------------------------------------------
|
|
("power", "Power", "W", "power", "measurement", None, "mdi:flash", 0),
|
|
("voltage", "Voltage", "V", "voltage", "measurement", None, "mdi:lightning-bolt", 0),
|
|
("amp", "Current", "A", "current", "measurement",
|
|
"{{ (value | float / 1000) | round(2) }}", "mdi:current-ac", 2),
|
|
("pilot", "Pilot Current", "A", "current", "measurement", None, "mdi:current-ac", 0),
|
|
("max_current", "Max Current", "A", "current", "measurement", None, "mdi:current-ac", 0),
|
|
# --- energy counters --------------------------------------------------
|
|
("session_energy","Session Energy", "Wh", "energy", "total_increasing", None, "mdi:battery-charging", 1),
|
|
("total_energy", "Total Energy", "Wh", "energy", "total_increasing", None, "mdi:counter", 1),
|
|
("total_day", "Energy Today", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
|
|
("total_week", "Energy This Week", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
|
|
("total_month", "Energy This Month","kWh", "energy", "total_increasing", None, "mdi:counter", 3),
|
|
("total_year", "Energy This Year", "kWh", "energy", "total_increasing", None, "mdi:counter", 3),
|
|
# --- time -------------------------------------------------------------
|
|
("session_elapsed","Session Elapsed", "s", "duration", "measurement", None, "mdi:timer-outline", 0),
|
|
("uptime", "Uptime", "s", "duration", "measurement", None, "mdi:clock-outline", 0),
|
|
# --- temperature ------------------------------------------------------
|
|
("temp", "Temperature", "°C", "temperature", "measurement",
|
|
"{{ (value | float / 10) | round(1) }}", "mdi:thermometer", 1),
|
|
("temp_max", "Max Temperature", "°C", "temperature", "measurement",
|
|
"{{ (value | float / 10) | round(1) }}", "mdi:thermometer-alert", 1),
|
|
# --- status -----------------------------------------------------------
|
|
("status", "Status", None, None, None, None, "mdi:ev-station", None),
|
|
("state", "State Code", None, None, "measurement", None, "mdi:counter", 0),
|
|
# --- diagnostics ------------------------------------------------------
|
|
("srssi", "Wi-Fi RSSI", "dBm", "signal_strength","measurement", None, "mdi:wifi", 0),
|
|
("freeram", "Free RAM", "B", "data_size", "measurement", None, "mdi:memory", 0),
|
|
("total_switches","Total Switches", None, None, "total_increasing",None, "mdi:counter", 0),
|
|
]
|
|
|
|
# Binary sensors: OpenEVSE publishes "0"/"1" strings.
|
|
# (key, friendly name, device_class, icon)
|
|
BINARY_SENSORS: list[tuple] = [
|
|
("vehicle", "Vehicle Connected", "plug", "mdi:car-electric"),
|
|
("evse_connected", "EVSE Connected", "connectivity", "mdi:ev-plug-type1"),
|
|
("manual_override", "Manual Override", None, "mdi:hand-back-right"),
|
|
]
|
|
|
|
ENTITY_CATEGORY_DIAG = {
|
|
"uptime", "srssi", "freeram", "total_switches", "evse_connected",
|
|
}
|
|
|
|
|
|
def _load_broker_defaults(path: Path) -> dict[str, object]:
|
|
"""Pick host/port/user/password from powermon.yaml if present."""
|
|
if not path.is_file():
|
|
return {}
|
|
try:
|
|
data = yaml.safe_load(path.read_text()) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|
|
broker = data.get("mqttbroker") or {}
|
|
out = {}
|
|
if isinstance(broker.get("name"), str): out["host"] = broker["name"]
|
|
if isinstance(broker.get("port"), int): out["port"] = broker["port"]
|
|
if isinstance(broker.get("username"), str): out["user"] = broker["username"]
|
|
if isinstance(broker.get("password"), str): out["password"] = broker["password"]
|
|
return out
|
|
|
|
|
|
def _make_client(client_id: str) -> mqtt.Client:
|
|
try:
|
|
return mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
|
|
except AttributeError: # paho-mqtt 1.x
|
|
return mqtt.Client(client_id=client_id)
|
|
|
|
|
|
def _read_announce(host: str, port: int, user: str, password: str,
|
|
prefix: str, timeout: float = 3.0) -> dict | None:
|
|
"""Subscribe to the announce wildcard and grab the retained payload, if any."""
|
|
seen: dict | None = None
|
|
|
|
c = _make_client("openevse-announce-reader")
|
|
c.username_pw_set(user, password)
|
|
|
|
def on_message(_client, _ud, msg):
|
|
nonlocal seen
|
|
try:
|
|
seen = json.loads(msg.payload.decode())
|
|
seen["_topic"] = msg.topic
|
|
except (UnicodeDecodeError, json.JSONDecodeError):
|
|
pass
|
|
|
|
c.on_message = on_message
|
|
c.connect(host, port, keepalive=10)
|
|
c.subscribe(f"{prefix}/announce/+", qos=0)
|
|
c.loop_start()
|
|
deadline = time.monotonic() + timeout
|
|
while seen is None and time.monotonic() < deadline:
|
|
time.sleep(0.05)
|
|
c.loop_stop()
|
|
c.disconnect()
|
|
return seen
|
|
|
|
|
|
def _build_device(announce: dict | None, device_id: str) -> dict:
|
|
"""Build the HA `device` block, populated from announce when available."""
|
|
name = (announce or {}).get("name") or f"openevse-{device_id}"
|
|
http = (announce or {}).get("http")
|
|
dev: dict = {
|
|
"identifiers": [f"openevse_{device_id}"],
|
|
"name": name,
|
|
"manufacturer": "OpenEVSE",
|
|
"model": "OpenEVSE WiFi (ESP32)",
|
|
}
|
|
if http:
|
|
dev["configuration_url"] = http
|
|
return dev
|
|
|
|
|
|
def _common_disco(prefix: str, key: str, device_id: str, dev: dict,
|
|
ann_topic: str) -> dict:
|
|
return {
|
|
"name": None, # set per entity
|
|
"state_topic": f"{prefix}/{key}",
|
|
"unique_id": f"openevse_{device_id}_{key}",
|
|
"object_id": f"openevse_{key}",
|
|
"device": dev,
|
|
"availability": [{
|
|
"topic": ann_topic,
|
|
"value_template": '{{ "online" if value_json.state == "connected" else "offline" }}',
|
|
"payload_available": "online",
|
|
"payload_not_available": "offline",
|
|
}],
|
|
}
|
|
|
|
|
|
def _sensor_topic(ha_prefix: str, key: str) -> str:
|
|
return f"{ha_prefix}/sensor/openevse_{key}/config"
|
|
|
|
|
|
def _binary_topic(ha_prefix: str, key: str) -> str:
|
|
return f"{ha_prefix}/binary_sensor/openevse_{key}/config"
|
|
|
|
|
|
def build_payloads(prefix: str, ha_prefix: str, device_id: str,
|
|
announce: dict | None) -> list[tuple[str, dict]]:
|
|
"""Return [(topic, payload_dict), ...] for every discovery config."""
|
|
dev = _build_device(announce, device_id)
|
|
ann_topic = (announce or {}).get("_topic") or f"{prefix}/announce/{device_id}"
|
|
out: list[tuple[str, dict]] = []
|
|
|
|
for key, name, unit, dclass, sclass, vt, icon, prec in SENSORS:
|
|
cfg = _common_disco(prefix, key, device_id, dev, ann_topic)
|
|
cfg["name"] = name
|
|
if unit is not None: cfg["unit_of_measurement"] = unit
|
|
if dclass is not None: cfg["device_class"] = dclass
|
|
if sclass is not None: cfg["state_class"] = sclass
|
|
if vt is not None: cfg["value_template"] = vt
|
|
if icon is not None: cfg["icon"] = icon
|
|
if prec is not None: cfg["suggested_display_precision"] = prec
|
|
if key in ENTITY_CATEGORY_DIAG:
|
|
cfg["entity_category"] = "diagnostic"
|
|
out.append((_sensor_topic(ha_prefix, key), cfg))
|
|
|
|
for key, name, dclass, icon in BINARY_SENSORS:
|
|
cfg = _common_disco(prefix, key, device_id, dev, ann_topic)
|
|
cfg["name"] = name
|
|
cfg["payload_on"] = "1"
|
|
cfg["payload_off"] = "0"
|
|
if dclass is not None: cfg["device_class"] = dclass
|
|
if icon is not None: cfg["icon"] = icon
|
|
if key in ENTITY_CATEGORY_DIAG:
|
|
cfg["entity_category"] = "diagnostic"
|
|
out.append((_binary_topic(ha_prefix, key), cfg))
|
|
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
defaults = _load_broker_defaults(DEFAULT_BROKER_CONFIG)
|
|
|
|
ap = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
ap.add_argument("--host", default=defaults.get("host"),
|
|
help=f"broker host (default: from {DEFAULT_BROKER_CONFIG} → mqttbroker.name)")
|
|
ap.add_argument("--port", type=int, default=defaults.get("port", 1883))
|
|
ap.add_argument("--user", default=defaults.get("user"))
|
|
ap.add_argument("--password", default=defaults.get("password"))
|
|
ap.add_argument("--prefix", default=DEFAULT_PREFIX,
|
|
help=f"OpenEVSE MQTT topic prefix (default: {DEFAULT_PREFIX})")
|
|
ap.add_argument("--ha-prefix", default=DEFAULT_HA_PREFIX,
|
|
help=f"HA discovery prefix (default: {DEFAULT_HA_PREFIX})")
|
|
ap.add_argument("--device-id",
|
|
help="device id; if omitted, read from announce topic")
|
|
ap.add_argument("--purge", action="store_true",
|
|
help="publish empty retained payloads to clear all entities")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="print what would be published, don't connect")
|
|
args = ap.parse_args()
|
|
|
|
if not args.host:
|
|
ap.error("no --host given and no broker config found")
|
|
if args.user is None or args.password is None:
|
|
ap.error("--user / --password required (or populate ~/.config/powermon/powermon.yaml)")
|
|
|
|
announce = None
|
|
if not args.dry_run:
|
|
announce = _read_announce(args.host, args.port, args.user, args.password,
|
|
args.prefix)
|
|
device_id = args.device_id
|
|
if not device_id and announce:
|
|
# announce topic is openevse/announce/<id>
|
|
topic = announce.get("_topic", "")
|
|
device_id = topic.rsplit("/", 1)[-1] or None
|
|
if not device_id:
|
|
ap.error("could not determine device id from announce topic; "
|
|
"pass --device-id (e.g. last 4 of MAC, like 'a048')")
|
|
|
|
payloads = build_payloads(args.prefix, args.ha_prefix, device_id, announce)
|
|
|
|
action = "purge" if args.purge else "publish"
|
|
print(f"{action} {len(payloads)} HA discovery config(s) for openevse "
|
|
f"device_id={device_id!r}, broker={args.host}:{args.port}")
|
|
if announce is not None:
|
|
print(f" announce: {announce.get('name', '?')} ({announce.get('http', 'no http')})")
|
|
elif not args.dry_run:
|
|
print(" (no retained announce found — entities will be marked unavailable "
|
|
"until OpenEVSE next reconnects to the broker)")
|
|
|
|
if args.dry_run:
|
|
for topic, body in payloads:
|
|
shown = "<purge>" if args.purge else json.dumps(body, separators=(",", ":"))
|
|
print(f" {topic}\n {shown}")
|
|
return 0
|
|
|
|
c = _make_client("openevse-publish-discovery")
|
|
c.username_pw_set(args.user, args.password)
|
|
c.connect(args.host, args.port, keepalive=30)
|
|
c.loop_start()
|
|
try:
|
|
for i, (topic, body) in enumerate(payloads, start=1):
|
|
payload = "" if args.purge else json.dumps(body, separators=(",", ":"))
|
|
info = c.publish(topic, payload=payload, qos=0, retain=True)
|
|
info.wait_for_publish(2)
|
|
print(f" {i:>3}/{len(payloads)} {topic}")
|
|
finally:
|
|
c.loop_stop()
|
|
c.disconnect()
|
|
print(f"done — {len(payloads)} retained config(s) {'cleared' if args.purge else 'written'}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|