Files
shaggy-solar/.claude/skills/lib/ha-history

185 lines
7.0 KiB
Plaintext
Raw Permalink Normal View History

#!/usr/bin/env python3
"""ha-history — pull state history for HA entities from the Home Assistant
recorder, and print a compact change-point timeline. Companion to solar-snapshot
(which only sees live values); this is the historic-lookback tool.
Auth: reads a long-lived access token from ~/.config/ha/token (or $HA_TOKEN).
No secret is ever passed on the command line or hardcoded.
Base URL: $HA_URL, else ~/.config/ha/url, else http://10.0.0.41:8123.
Usage:
ha-history [-s SINCE] [-e END] [-m REGEX] [-a] ENTITY [ENTITY ...]
ENTITY entity_id; a bare name with no dot is auto-prefixed `sensor.`
e.g. `lvx6048_1_device_mode` -> `sensor.lvx6048_1_device_mode`
-s SINCE start of window. default "7 days ago".
accepts: "7 days ago", "7d", "36h", "90m", an ISO timestamp,
or a date "2026-06-16".
-e END end of window. default: now. same formats as -s.
-m REGEX only show change-points whose state matches REGEX (case-insensitive);
the per-entity header still reports the full count. e.g. -m fault
-a show every recorded point, not just state *changes*.
Examples:
ha-history lvx6048_1_device_mode lvx6048_2_device_mode
ha-history -s "10 days ago" -m fault lvx6048_1_fault_code lvx6048_2_fault_code
ha-history -s 2026-06-16 -e 2026-06-23 lvx6048_1_device_mode
"""
import sys, os, re, json, argparse, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timedelta, timezone
CONF_DIR = os.path.expanduser("~/.config/ha")
DEFAULT_URL = "http://10.0.0.41:8123"
def die(msg, code=1):
print(f"ha-history: {msg}", file=sys.stderr)
sys.exit(code)
def load_token():
tok = os.environ.get("HA_TOKEN")
if tok:
return tok.strip()
path = os.path.join(CONF_DIR, "token")
if not os.path.exists(path):
die("no token. Create a Long-Lived Access Token in HA "
"(Profile -> Security), then:\n"
" mkdir -p ~/.config/ha && install -m600 /dev/stdin ~/.config/ha/token\n"
"or set $HA_TOKEN.")
with open(path) as f:
tok = f.read().strip()
if not tok:
die(f"{path} is empty")
return tok
def base_url():
if os.environ.get("HA_URL"):
return os.environ["HA_URL"].rstrip("/")
p = os.path.join(CONF_DIR, "url")
if os.path.exists(p):
with open(p) as f:
u = f.read().strip()
if u:
return u.rstrip("/")
return DEFAULT_URL
def parse_when(s, *, default_now=False):
if s is None:
return datetime.now(timezone.utc).astimezone() if default_now else None
s = s.strip()
m = re.fullmatch(r"(\d+)\s*(d|h|m)(?:ays?|ours?|in(?:ute)?s?)?(?:\s*ago)?", s, re.I)
if m:
n, unit = int(m.group(1)), m.group(2).lower()
delta = {"d": timedelta(days=n), "h": timedelta(hours=n), "m": timedelta(minutes=n)}[unit]
return datetime.now(timezone.utc).astimezone() - delta
# ISO timestamp or bare date
try:
dt = datetime.fromisoformat(s)
except ValueError:
die(f"can't parse time {s!r}. Use '7 days ago', '36h', ISO, or 'YYYY-MM-DD'.")
if dt.tzinfo is None: # assume local tz
dt = dt.astimezone()
return dt
def fetch(url, token):
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.load(r)
except urllib.error.HTTPError as e:
if e.code == 401:
die("401 Unauthorized — token rejected. Regenerate it in HA and rewrite "
"~/.config/ha/token.")
die(f"HTTP {e.code} from HA: {e.reason}")
except urllib.error.URLError as e:
die(f"cannot reach HA at {url.split('/api')[0]}: {e.reason}")
def fmt_local(iso):
"""HA returns UTC ISO; show local time, second precision."""
try:
return datetime.fromisoformat(iso).astimezone().strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
return str(iso)
def main():
ap = argparse.ArgumentParser(add_help=False)
ap.add_argument("-s", "--since", default="7 days ago")
ap.add_argument("-e", "--end", default=None)
ap.add_argument("-m", "--match", default=None)
ap.add_argument("-a", "--all-points", action="store_true")
ap.add_argument("-h", "--help", action="store_true")
ap.add_argument("entities", nargs="*")
a = ap.parse_args()
if a.help or not a.entities:
print(__doc__.strip())
sys.exit(0 if a.help else 2)
ents = [e if "." in e else f"sensor.{e}" for e in a.entities]
start = parse_when(a.since)
end = parse_when(a.end, default_now=True)
matcher = re.compile(a.match, re.I) if a.match else None
token = load_token()
url = (f"{base_url()}/api/history/period/"
f"{urllib.parse.quote(start.isoformat())}"
f"?end_time={urllib.parse.quote(end.isoformat())}"
f"&filter_entity_id={urllib.parse.quote(','.join(ents))}"
f"&minimal_response&no_attributes")
data = fetch(url, token)
print(f"# HA history {start.strftime('%Y-%m-%d %H:%M')} -> "
f"{end.strftime('%Y-%m-%d %H:%M')} ({base_url()})\n")
by_id = {}
for series in data or []:
if series:
by_id[series[0].get("entity_id")] = series
for ent in ents:
series = by_id.get(ent)
if not series:
print(f"{ent}\n (no recorded history in window — entity wrong, "
f"excluded from recorder, or purged)\n")
continue
# Build (time, state) points, collapsing consecutive identical states
# unless --all-points.
points, prev = [], object()
for item in series:
st = item.get("state")
ts = item.get("last_changed") or item.get("last_updated")
if a.all_points or st != prev:
points.append((ts, st))
prev = st
shown = [(ts, st) for ts, st in points if not matcher or matcher.search(str(st))]
label = "points" if a.all_points else "changes"
extra = f", {len(shown)} match /m" if matcher else ""
print(f"{ent} ({len(points)} {label}{extra})")
if not shown:
print(" (nothing matched)\n")
continue
for i, (ts, st) in enumerate(shown):
mark = " <<< FAULT" if re.search(r"fault", str(st), re.I) and st not in ("No fault",) else ""
# duration until next change-point in the *full* timeline
dur = ""
if not matcher:
nxt = points[i + 1][0] if i + 1 < len(points) else None
if nxt:
try:
d = datetime.fromisoformat(nxt) - datetime.fromisoformat(ts)
secs = int(d.total_seconds())
dur = f" ({secs//3600}h{secs%3600//60:02d}m)" if secs >= 3600 else f" ({secs//60}m)"
except (ValueError, TypeError):
pass
print(f" {fmt_local(ts)} {st}{dur}{mark}")
print()
if __name__ == "__main__":
main()