"""Match FIT files to activity rows by content (session.start_time). The Garmin Connect data export embeds activity IDs in FIT filenames (`_.fit`), so `ingest_export.py` can index them straight from the filename. The newer Garmin **Takeout** dump uses upload IDs instead (`_.fit`), which are a different ID space, so filename-based linking fails. This script does the linking via FIT content: it opens each FIT, reads the `session.start_time` (UTC), and matches it against `activities.start_time_gmt` within a small tolerance. Usage: uv run link_fit_files.py uv run link_fit_files.py --dry-run --min-size-kb 50 Defaults: Only parses FITs >= 50 KB (workout FITs; smaller files are HRV/sleep/body-battery monitoring snapshots, ~99% of the dump by count). Tolerance: 60 seconds. Garmin sometimes rounds session start to the second while activities.start_time_gmt is also second-precision. Re-running is safe — the table is upsert by activity_id. """ from __future__ import annotations import argparse import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path import fitparse from db import connect def _parse_session_start(fit_path: Path) -> datetime | None: """Return the session start_time as a UTC datetime, or None if the file has none.""" try: fit = fitparse.FitFile(str(fit_path)) for msg in fit.get_messages("session"): vals = msg.get_values() ts = vals.get("start_time") if ts is None: continue if isinstance(ts, datetime): # fitparse returns naive datetimes in UTC for FIT timestamps. return ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else ts.astimezone(timezone.utc) return None except Exception as exc: # noqa: BLE001 print(f" ! parse failed for {fit_path.name}: {exc}", file=sys.stderr) return None def _load_activity_index(conn: sqlite3.Connection) -> dict[int, int]: """Map (epoch_seconds_utc) -> activity_id for every activity with a start_time_gmt.""" idx: dict[int, int] = {} for aid, gmt in conn.execute("SELECT activity_id, start_time_gmt FROM activities"): if gmt is None: continue # start_time_gmt may be an ISO string, ms epoch number, or sec epoch. try: if isinstance(gmt, (int, float)): v = float(gmt) # heuristic: >1e12 means ms epoch, otherwise seconds secs = int(v / 1000) if v > 1e12 else int(v) else: s = str(gmt).strip().rstrip("Z") secs = int(datetime.fromisoformat(s).replace(tzinfo=timezone.utc).timestamp()) except (ValueError, TypeError): continue idx[secs] = aid return idx def link(export_root: Path, *, dry_run: bool, min_size_kb: int, tolerance_s: int) -> None: if not export_root.exists(): sys.exit(f"path does not exist: {export_root}") conn = connect() index = _load_activity_index(conn) if not index: sys.exit("no activities with start_time_gmt — ingest activities first") sorted_keys = sorted(index.keys()) print(f"loaded {len(index):,} activity start times") candidates = [p for p in export_root.rglob("*.fit") if p.stat().st_size >= min_size_kb * 1024] print(f"scanning {len(candidates):,} FIT files ≥ {min_size_kb} KB") linked = unmatched = parse_failed = 0 for i, p in enumerate(candidates, 1): if i % 50 == 0: print(f" … {i}/{len(candidates)} processed (linked={linked}, unmatched={unmatched})") start = _parse_session_start(p) if start is None: parse_failed += 1 continue target = int(start.timestamp()) # Binary search the closest key within tolerance. from bisect import bisect_left pos = bisect_left(sorted_keys, target) candidates_near: list[int] = [] if pos < len(sorted_keys): candidates_near.append(sorted_keys[pos]) if pos > 0: candidates_near.append(sorted_keys[pos - 1]) best = min((c for c in candidates_near if abs(c - target) <= tolerance_s), default=None, key=lambda c: abs(c - target)) if best is None: unmatched += 1 continue aid = index[best] if not dry_run: rel = p.relative_to(export_root) conn.execute( """INSERT OR REPLACE INTO activity_fit_files (activity_id, fit_path, indexed_at) VALUES (?, ?, datetime('now'))""", (aid, str(rel)), ) linked += 1 if not dry_run: conn.commit() conn.close() print() print("=== linker summary ===") print(f" candidates scanned: {len(candidates):,}") print(f" linked : {linked:,}" + (" (dry-run)" if dry_run else "")) print(f" unmatched start : {unmatched:,} (no activity within ±{tolerance_s}s)") print(f" parse failures : {parse_failed:,}") def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("export_root", help="Path to the unzipped Garmin export folder") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--min-size-kb", type=int, default=50, help="Skip FITs smaller than this; default 50 KB filters out HRV/sleep snapshots") parser.add_argument("--tolerance-s", type=int, default=60, help="Max seconds between FIT session start and activity start to count as a match") args = parser.parse_args() link(Path(args.export_root).expanduser().resolve(), dry_run=args.dry_run, min_size_kb=args.min_size_kb, tolerance_s=args.tolerance_s) if __name__ == "__main__": main()