185 lines
7.0 KiB
Plaintext
185 lines
7.0 KiB
Plaintext
|
|
#!/usr/bin/env python3
|
||
|
|
"""ha-history — pull state history for HA entities from the Home Assistant
|
||
|
|
recorder, and print a compact change-point timeline. Companion to solar-snapshot
|
||
|
|
(which only sees live values); this is the historic-lookback tool.
|
||
|
|
|
||
|
|
Auth: reads a long-lived access token from ~/.config/ha/token (or $HA_TOKEN).
|
||
|
|
No secret is ever passed on the command line or hardcoded.
|
||
|
|
Base URL: $HA_URL, else ~/.config/ha/url, else http://10.0.0.41:8123.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
ha-history [-s SINCE] [-e END] [-m REGEX] [-a] ENTITY [ENTITY ...]
|
||
|
|
|
||
|
|
ENTITY entity_id; a bare name with no dot is auto-prefixed `sensor.`
|
||
|
|
e.g. `lvx6048_1_device_mode` -> `sensor.lvx6048_1_device_mode`
|
||
|
|
-s SINCE start of window. default "7 days ago".
|
||
|
|
accepts: "7 days ago", "7d", "36h", "90m", an ISO timestamp,
|
||
|
|
or a date "2026-06-16".
|
||
|
|
-e END end of window. default: now. same formats as -s.
|
||
|
|
-m REGEX only show change-points whose state matches REGEX (case-insensitive);
|
||
|
|
the per-entity header still reports the full count. e.g. -m fault
|
||
|
|
-a show every recorded point, not just state *changes*.
|
||
|
|
|
||
|
|
Examples:
|
||
|
|
ha-history lvx6048_1_device_mode lvx6048_2_device_mode
|
||
|
|
ha-history -s "10 days ago" -m fault lvx6048_1_fault_code lvx6048_2_fault_code
|
||
|
|
ha-history -s 2026-06-16 -e 2026-06-23 lvx6048_1_device_mode
|
||
|
|
"""
|
||
|
|
import sys, os, re, json, argparse, urllib.request, urllib.parse, urllib.error
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
|
||
|
|
CONF_DIR = os.path.expanduser("~/.config/ha")
|
||
|
|
DEFAULT_URL = "http://10.0.0.41:8123"
|
||
|
|
|
||
|
|
|
||
|
|
def die(msg, code=1):
|
||
|
|
print(f"ha-history: {msg}", file=sys.stderr)
|
||
|
|
sys.exit(code)
|
||
|
|
|
||
|
|
|
||
|
|
def load_token():
|
||
|
|
tok = os.environ.get("HA_TOKEN")
|
||
|
|
if tok:
|
||
|
|
return tok.strip()
|
||
|
|
path = os.path.join(CONF_DIR, "token")
|
||
|
|
if not os.path.exists(path):
|
||
|
|
die("no token. Create a Long-Lived Access Token in HA "
|
||
|
|
"(Profile -> Security), then:\n"
|
||
|
|
" mkdir -p ~/.config/ha && install -m600 /dev/stdin ~/.config/ha/token\n"
|
||
|
|
"or set $HA_TOKEN.")
|
||
|
|
with open(path) as f:
|
||
|
|
tok = f.read().strip()
|
||
|
|
if not tok:
|
||
|
|
die(f"{path} is empty")
|
||
|
|
return tok
|
||
|
|
|
||
|
|
|
||
|
|
def base_url():
|
||
|
|
if os.environ.get("HA_URL"):
|
||
|
|
return os.environ["HA_URL"].rstrip("/")
|
||
|
|
p = os.path.join(CONF_DIR, "url")
|
||
|
|
if os.path.exists(p):
|
||
|
|
with open(p) as f:
|
||
|
|
u = f.read().strip()
|
||
|
|
if u:
|
||
|
|
return u.rstrip("/")
|
||
|
|
return DEFAULT_URL
|
||
|
|
|
||
|
|
|
||
|
|
def parse_when(s, *, default_now=False):
|
||
|
|
if s is None:
|
||
|
|
return datetime.now(timezone.utc).astimezone() if default_now else None
|
||
|
|
s = s.strip()
|
||
|
|
m = re.fullmatch(r"(\d+)\s*(d|h|m)(?:ays?|ours?|in(?:ute)?s?)?(?:\s*ago)?", s, re.I)
|
||
|
|
if m:
|
||
|
|
n, unit = int(m.group(1)), m.group(2).lower()
|
||
|
|
delta = {"d": timedelta(days=n), "h": timedelta(hours=n), "m": timedelta(minutes=n)}[unit]
|
||
|
|
return datetime.now(timezone.utc).astimezone() - delta
|
||
|
|
# ISO timestamp or bare date
|
||
|
|
try:
|
||
|
|
dt = datetime.fromisoformat(s)
|
||
|
|
except ValueError:
|
||
|
|
die(f"can't parse time {s!r}. Use '7 days ago', '36h', ISO, or 'YYYY-MM-DD'.")
|
||
|
|
if dt.tzinfo is None: # assume local tz
|
||
|
|
dt = dt.astimezone()
|
||
|
|
return dt
|
||
|
|
|
||
|
|
|
||
|
|
def fetch(url, token):
|
||
|
|
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
|
||
|
|
try:
|
||
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
||
|
|
return json.load(r)
|
||
|
|
except urllib.error.HTTPError as e:
|
||
|
|
if e.code == 401:
|
||
|
|
die("401 Unauthorized — token rejected. Regenerate it in HA and rewrite "
|
||
|
|
"~/.config/ha/token.")
|
||
|
|
die(f"HTTP {e.code} from HA: {e.reason}")
|
||
|
|
except urllib.error.URLError as e:
|
||
|
|
die(f"cannot reach HA at {url.split('/api')[0]}: {e.reason}")
|
||
|
|
|
||
|
|
|
||
|
|
def fmt_local(iso):
|
||
|
|
"""HA returns UTC ISO; show local time, second precision."""
|
||
|
|
try:
|
||
|
|
return datetime.fromisoformat(iso).astimezone().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return str(iso)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
ap = argparse.ArgumentParser(add_help=False)
|
||
|
|
ap.add_argument("-s", "--since", default="7 days ago")
|
||
|
|
ap.add_argument("-e", "--end", default=None)
|
||
|
|
ap.add_argument("-m", "--match", default=None)
|
||
|
|
ap.add_argument("-a", "--all-points", action="store_true")
|
||
|
|
ap.add_argument("-h", "--help", action="store_true")
|
||
|
|
ap.add_argument("entities", nargs="*")
|
||
|
|
a = ap.parse_args()
|
||
|
|
if a.help or not a.entities:
|
||
|
|
print(__doc__.strip())
|
||
|
|
sys.exit(0 if a.help else 2)
|
||
|
|
|
||
|
|
ents = [e if "." in e else f"sensor.{e}" for e in a.entities]
|
||
|
|
start = parse_when(a.since)
|
||
|
|
end = parse_when(a.end, default_now=True)
|
||
|
|
matcher = re.compile(a.match, re.I) if a.match else None
|
||
|
|
|
||
|
|
token = load_token()
|
||
|
|
url = (f"{base_url()}/api/history/period/"
|
||
|
|
f"{urllib.parse.quote(start.isoformat())}"
|
||
|
|
f"?end_time={urllib.parse.quote(end.isoformat())}"
|
||
|
|
f"&filter_entity_id={urllib.parse.quote(','.join(ents))}"
|
||
|
|
f"&minimal_response&no_attributes")
|
||
|
|
data = fetch(url, token)
|
||
|
|
|
||
|
|
print(f"# HA history {start.strftime('%Y-%m-%d %H:%M')} -> "
|
||
|
|
f"{end.strftime('%Y-%m-%d %H:%M')} ({base_url()})\n")
|
||
|
|
|
||
|
|
by_id = {}
|
||
|
|
for series in data or []:
|
||
|
|
if series:
|
||
|
|
by_id[series[0].get("entity_id")] = series
|
||
|
|
|
||
|
|
for ent in ents:
|
||
|
|
series = by_id.get(ent)
|
||
|
|
if not series:
|
||
|
|
print(f"{ent}\n (no recorded history in window — entity wrong, "
|
||
|
|
f"excluded from recorder, or purged)\n")
|
||
|
|
continue
|
||
|
|
# Build (time, state) points, collapsing consecutive identical states
|
||
|
|
# unless --all-points.
|
||
|
|
points, prev = [], object()
|
||
|
|
for item in series:
|
||
|
|
st = item.get("state")
|
||
|
|
ts = item.get("last_changed") or item.get("last_updated")
|
||
|
|
if a.all_points or st != prev:
|
||
|
|
points.append((ts, st))
|
||
|
|
prev = st
|
||
|
|
shown = [(ts, st) for ts, st in points if not matcher or matcher.search(str(st))]
|
||
|
|
label = "points" if a.all_points else "changes"
|
||
|
|
extra = f", {len(shown)} match /m" if matcher else ""
|
||
|
|
print(f"{ent} ({len(points)} {label}{extra})")
|
||
|
|
if not shown:
|
||
|
|
print(" (nothing matched)\n")
|
||
|
|
continue
|
||
|
|
for i, (ts, st) in enumerate(shown):
|
||
|
|
mark = " <<< FAULT" if re.search(r"fault", str(st), re.I) and st not in ("No fault",) else ""
|
||
|
|
# duration until next change-point in the *full* timeline
|
||
|
|
dur = ""
|
||
|
|
if not matcher:
|
||
|
|
nxt = points[i + 1][0] if i + 1 < len(points) else None
|
||
|
|
if nxt:
|
||
|
|
try:
|
||
|
|
d = datetime.fromisoformat(nxt) - datetime.fromisoformat(ts)
|
||
|
|
secs = int(d.total_seconds())
|
||
|
|
dur = f" ({secs//3600}h{secs%3600//60:02d}m)" if secs >= 3600 else f" ({secs//60}m)"
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
print(f" {fmt_local(ts)} {st}{dur}{mark}")
|
||
|
|
print()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|