#!/usr/bin/env python3 """ha-history — pull state history for HA entities from the Home Assistant recorder, and print a compact change-point timeline. Companion to solar-snapshot (which only sees live values); this is the historic-lookback tool. Auth: reads a long-lived access token from ~/.config/ha/token (or $HA_TOKEN). No secret is ever passed on the command line or hardcoded. Base URL: $HA_URL, else ~/.config/ha/url, else http://10.0.0.41:8123. Usage: ha-history [-s SINCE] [-e END] [-m REGEX] [-a] ENTITY [ENTITY ...] ENTITY entity_id; a bare name with no dot is auto-prefixed `sensor.` e.g. `lvx6048_1_device_mode` -> `sensor.lvx6048_1_device_mode` -s SINCE start of window. default "7 days ago". accepts: "7 days ago", "7d", "36h", "90m", an ISO timestamp, or a date "2026-06-16". -e END end of window. default: now. same formats as -s. -m REGEX only show change-points whose state matches REGEX (case-insensitive); the per-entity header still reports the full count. e.g. -m fault -a show every recorded point, not just state *changes*. Examples: ha-history lvx6048_1_device_mode lvx6048_2_device_mode ha-history -s "10 days ago" -m fault lvx6048_1_fault_code lvx6048_2_fault_code ha-history -s 2026-06-16 -e 2026-06-23 lvx6048_1_device_mode """ import sys, os, re, json, argparse, urllib.request, urllib.parse, urllib.error from datetime import datetime, timedelta, timezone CONF_DIR = os.path.expanduser("~/.config/ha") DEFAULT_URL = "http://10.0.0.41:8123" def die(msg, code=1): print(f"ha-history: {msg}", file=sys.stderr) sys.exit(code) def load_token(): tok = os.environ.get("HA_TOKEN") if tok: return tok.strip() path = os.path.join(CONF_DIR, "token") if not os.path.exists(path): die("no token. Create a Long-Lived Access Token in HA " "(Profile -> Security), then:\n" " mkdir -p ~/.config/ha && install -m600 /dev/stdin ~/.config/ha/token\n" "or set $HA_TOKEN.") with open(path) as f: tok = f.read().strip() if not tok: die(f"{path} is empty") return tok def base_url(): if os.environ.get("HA_URL"): return os.environ["HA_URL"].rstrip("/") p = os.path.join(CONF_DIR, "url") if os.path.exists(p): with open(p) as f: u = f.read().strip() if u: return u.rstrip("/") return DEFAULT_URL def parse_when(s, *, default_now=False): if s is None: return datetime.now(timezone.utc).astimezone() if default_now else None s = s.strip() m = re.fullmatch(r"(\d+)\s*(d|h|m)(?:ays?|ours?|in(?:ute)?s?)?(?:\s*ago)?", s, re.I) if m: n, unit = int(m.group(1)), m.group(2).lower() delta = {"d": timedelta(days=n), "h": timedelta(hours=n), "m": timedelta(minutes=n)}[unit] return datetime.now(timezone.utc).astimezone() - delta # ISO timestamp or bare date try: dt = datetime.fromisoformat(s) except ValueError: die(f"can't parse time {s!r}. Use '7 days ago', '36h', ISO, or 'YYYY-MM-DD'.") if dt.tzinfo is None: # assume local tz dt = dt.astimezone() return dt def fetch(url, token): req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"}) try: with urllib.request.urlopen(req, timeout=30) as r: return json.load(r) except urllib.error.HTTPError as e: if e.code == 401: die("401 Unauthorized — token rejected. Regenerate it in HA and rewrite " "~/.config/ha/token.") die(f"HTTP {e.code} from HA: {e.reason}") except urllib.error.URLError as e: die(f"cannot reach HA at {url.split('/api')[0]}: {e.reason}") def fmt_local(iso): """HA returns UTC ISO; show local time, second precision.""" try: return datetime.fromisoformat(iso).astimezone().strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): return str(iso) def main(): ap = argparse.ArgumentParser(add_help=False) ap.add_argument("-s", "--since", default="7 days ago") ap.add_argument("-e", "--end", default=None) ap.add_argument("-m", "--match", default=None) ap.add_argument("-a", "--all-points", action="store_true") ap.add_argument("-h", "--help", action="store_true") ap.add_argument("entities", nargs="*") a = ap.parse_args() if a.help or not a.entities: print(__doc__.strip()) sys.exit(0 if a.help else 2) ents = [e if "." in e else f"sensor.{e}" for e in a.entities] start = parse_when(a.since) end = parse_when(a.end, default_now=True) matcher = re.compile(a.match, re.I) if a.match else None token = load_token() url = (f"{base_url()}/api/history/period/" f"{urllib.parse.quote(start.isoformat())}" f"?end_time={urllib.parse.quote(end.isoformat())}" f"&filter_entity_id={urllib.parse.quote(','.join(ents))}" f"&minimal_response&no_attributes") data = fetch(url, token) print(f"# HA history {start.strftime('%Y-%m-%d %H:%M')} -> " f"{end.strftime('%Y-%m-%d %H:%M')} ({base_url()})\n") by_id = {} for series in data or []: if series: by_id[series[0].get("entity_id")] = series for ent in ents: series = by_id.get(ent) if not series: print(f"{ent}\n (no recorded history in window — entity wrong, " f"excluded from recorder, or purged)\n") continue # Build (time, state) points, collapsing consecutive identical states # unless --all-points. points, prev = [], object() for item in series: st = item.get("state") ts = item.get("last_changed") or item.get("last_updated") if a.all_points or st != prev: points.append((ts, st)) prev = st shown = [(ts, st) for ts, st in points if not matcher or matcher.search(str(st))] label = "points" if a.all_points else "changes" extra = f", {len(shown)} match /m" if matcher else "" print(f"{ent} ({len(points)} {label}{extra})") if not shown: print(" (nothing matched)\n") continue for i, (ts, st) in enumerate(shown): mark = " <<< FAULT" if re.search(r"fault", str(st), re.I) and st not in ("No fault",) else "" # duration until next change-point in the *full* timeline dur = "" if not matcher: nxt = points[i + 1][0] if i + 1 < len(points) else None if nxt: try: d = datetime.fromisoformat(nxt) - datetime.fromisoformat(ts) secs = int(d.total_seconds()) dur = f" ({secs//3600}h{secs%3600//60:02d}m)" if secs >= 3600 else f" ({secs//60}m)" except (ValueError, TypeError): pass print(f" {fmt_local(ts)} {st}{dur}{mark}") print() if __name__ == "__main__": main()