1.x updates
This commit is contained in:
497
ingest_export.py
497
ingest_export.py
@@ -1,498 +1,5 @@
|
||||
"""Ingest a Garmin Connect data export (zip or unzipped directory) into SQLite.
|
||||
|
||||
Usage:
|
||||
uv run ingest_export.py path/to/export.zip
|
||||
uv run ingest_export.py path/to/unzipped_export_dir/
|
||||
|
||||
Garmin's export contains a tree like:
|
||||
DI_CONNECT/
|
||||
DI-Connect-Fitness/ # activity JSONs + .fit files
|
||||
DI-Connect-Wellness/ # daily wellness JSONs
|
||||
DI-Connect-Aggregator/ # rolled-up summaries
|
||||
DI-Connect-User/ # profile
|
||||
...
|
||||
|
||||
File names vary by account / export date. We dispatch on filename substrings
|
||||
and log anything unrecognized so you can tell us about new shapes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sqlite3
|
||||
import sys
|
||||
import tempfile
|
||||
import zipfile
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Iterable
|
||||
|
||||
from db import connect, set_state
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_json(path: Path) -> Any:
|
||||
try:
|
||||
with path.open() as fh:
|
||||
return json.load(fh)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
print(f" ! failed to read {path.name}: {exc}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def _as_list(payload: Any) -> list[dict]:
|
||||
"""Garmin JSON files are sometimes a list, sometimes a {"key": [...]} envelope."""
|
||||
if isinstance(payload, list):
|
||||
return [x for x in payload if isinstance(x, dict)]
|
||||
if isinstance(payload, dict):
|
||||
for v in payload.values():
|
||||
if isinstance(v, list):
|
||||
return [x for x in v if isinstance(x, dict)]
|
||||
return [payload]
|
||||
return []
|
||||
|
||||
|
||||
def _dump(obj: Any) -> str:
|
||||
return json.dumps(obj, separators=(",", ":"), default=str)
|
||||
|
||||
|
||||
def _first(d: dict, *keys: str) -> Any:
|
||||
"""Pull the first non-null value among candidate keys (Garmin renames fields between exports)."""
|
||||
for k in keys:
|
||||
if k in d and d[k] is not None:
|
||||
return d[k]
|
||||
return None
|
||||
|
||||
|
||||
def _date_key(d: dict) -> str | None:
|
||||
"""Find the calendar date in a daily-stats record, normalized to ISO."""
|
||||
raw = _first(d, "calendarDate", "date", "summaryDate", "statisticsStartDate")
|
||||
if not raw:
|
||||
return None
|
||||
if isinstance(raw, str):
|
||||
return raw[:10]
|
||||
return str(raw)[:10]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# handlers — one per data category
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def handle_activities(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
# summarizedActivities format: [{"summarizedActivitiesExport": [{...}, {...}]}]
|
||||
if isinstance(payload, list) and payload and isinstance(payload[0], dict):
|
||||
if "summarizedActivitiesExport" in payload[0]:
|
||||
items = payload[0]["summarizedActivitiesExport"]
|
||||
else:
|
||||
items = payload
|
||||
elif isinstance(payload, dict):
|
||||
items = _as_list(payload)
|
||||
else:
|
||||
items = []
|
||||
|
||||
# The Garmin Takeout `summarizedActivities` export uses scaled integer units:
|
||||
# distance cm → m (÷100)
|
||||
# duration ms → s (÷1000)
|
||||
# elevation cm → m (÷100)
|
||||
# speed m/s ÷ 10 → m/s (×10)
|
||||
# The live API (`sync.py`) returns these in SI directly, so we only convert
|
||||
# when the source is this export and the raw values are present in the scaled form.
|
||||
def _scale(v, factor):
|
||||
return None if v is None else v * factor
|
||||
|
||||
n = 0
|
||||
for raw in items:
|
||||
aid = _first(raw, "activityId", "activityIdLocal")
|
||||
if aid is None:
|
||||
continue
|
||||
atype = raw.get("activityType")
|
||||
if isinstance(atype, dict):
|
||||
type_key = atype.get("typeKey")
|
||||
else:
|
||||
type_key = atype # sometimes a plain string in exports
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO activities (
|
||||
activity_id, start_time_local, start_time_gmt, activity_type,
|
||||
activity_name, distance_m, duration_s, moving_duration_s,
|
||||
avg_speed_mps, max_speed_mps, avg_hr, max_hr, calories,
|
||||
elevation_gain_m, elevation_loss_m, training_load,
|
||||
aerobic_te, anaerobic_te, vo2_max, raw, fetched_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
|
||||
ON CONFLICT(activity_id) DO UPDATE SET
|
||||
activity_name=excluded.activity_name,
|
||||
raw=excluded.raw,
|
||||
fetched_at=excluded.fetched_at
|
||||
""",
|
||||
(
|
||||
aid,
|
||||
_first(raw, "startTimeLocal", "beginTimestamp"),
|
||||
_first(raw, "startTimeGmt", "startTimeGMT"),
|
||||
type_key,
|
||||
_first(raw, "activityName", "name"),
|
||||
_scale(_first(raw, "distance"), 0.01),
|
||||
_scale(_first(raw, "duration"), 0.001),
|
||||
_scale(_first(raw, "movingDuration"), 0.001),
|
||||
_scale(_first(raw, "averageSpeed", "avgSpeed"), 10.0),
|
||||
_scale(_first(raw, "maxSpeed"), 10.0),
|
||||
_first(raw, "averageHR", "avgHr"),
|
||||
_first(raw, "maxHR", "maxHr"),
|
||||
_first(raw, "calories"),
|
||||
_scale(_first(raw, "elevationGain"), 0.01),
|
||||
_scale(_first(raw, "elevationLoss"), 0.01),
|
||||
_first(raw, "activityTrainingLoad"),
|
||||
_first(raw, "aerobicTrainingEffect"),
|
||||
_first(raw, "anaerobicTrainingEffect"),
|
||||
_first(raw, "vO2MaxValue"),
|
||||
_dump(raw),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_sleep(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_sleep
|
||||
(calendar_date, sleep_start_gmt, sleep_end_gmt, deep_s, light_s, rem_s, awake_s, sleep_score, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "sleepStartTimestampGMT", "sleepStartTimeGmt"),
|
||||
_first(item, "sleepEndTimestampGMT", "sleepEndTimeGmt"),
|
||||
_first(item, "deepSleepSeconds"),
|
||||
_first(item, "lightSleepSeconds"),
|
||||
_first(item, "remSleepSeconds"),
|
||||
_first(item, "awakeSleepSeconds", "awakeSeconds"),
|
||||
_first(item, "sleepScore", "overallSleepScore", "value"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_steps(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_steps
|
||||
(calendar_date, total_steps, step_goal, distance_m, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "totalSteps", "steps"),
|
||||
_first(item, "stepGoal", "dailyStepGoal"),
|
||||
_first(item, "totalDistance", "distance"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_stress(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_stress
|
||||
(calendar_date, avg_stress, max_stress, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "overallStressLevel", "averageStressLevel", "avgStress"),
|
||||
_first(item, "maxStressLevel", "maxStress"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_hrv(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_hrv
|
||||
(calendar_date, weekly_avg, last_night_avg, last_night_5min, status, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "weeklyAvg"),
|
||||
_first(item, "lastNightAvg"),
|
||||
_first(item, "lastNight5MinHigh"),
|
||||
_first(item, "status"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_resting_hr(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
rhr = _first(item, "restingHeartRate", "value")
|
||||
if rhr is None:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_resting_hr
|
||||
(calendar_date, resting_hr, raw, fetched_at)
|
||||
VALUES (?, ?, ?, datetime('now'))""",
|
||||
(date_key, rhr, _dump(item)),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_intensity_minutes(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_intensity_minutes
|
||||
(calendar_date, moderate_minutes, vigorous_minutes, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "moderateIntensityMinutes", "moderateValue"),
|
||||
_first(item, "vigorousIntensityMinutes", "vigorousValue"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_body_battery(conn: sqlite3.Connection, path: Path) -> int:
|
||||
payload = _load_json(path)
|
||||
if payload is None:
|
||||
return 0
|
||||
n = 0
|
||||
for item in _as_list(payload):
|
||||
date_key = _date_key(item)
|
||||
if not date_key:
|
||||
continue
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO daily_body_battery
|
||||
(calendar_date, charged, drained, highest, lowest, raw, fetched_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))""",
|
||||
(
|
||||
date_key,
|
||||
_first(item, "charged", "bodyBatteryChargedValue"),
|
||||
_first(item, "drained", "bodyBatteryDrainedValue"),
|
||||
_first(item, "highest", "highestBatteryLevel", "bodyBatteryHighestValue"),
|
||||
_first(item, "lowest", "lowestBatteryLevel", "bodyBatteryLowestValue"),
|
||||
_dump(item),
|
||||
),
|
||||
)
|
||||
n += 1
|
||||
conn.commit()
|
||||
return n
|
||||
|
||||
|
||||
def handle_fit(conn: sqlite3.Connection, path: Path, export_root: Path) -> int:
|
||||
"""Index .fit file location by activity ID. Don't parse the binary here.
|
||||
|
||||
Garmin uses a few filename formats — handle both the classic `<id>_<name>.fit`
|
||||
and the Takeout variant `<email>_<id>.fit` by picking the first 8+ digit chunk.
|
||||
"""
|
||||
aid = None
|
||||
for chunk in path.stem.split("_"):
|
||||
if chunk.isdigit() and len(chunk) >= 8:
|
||||
aid = int(chunk)
|
||||
break
|
||||
if aid is None:
|
||||
# filename has no recognisable activity-id chunk
|
||||
return 0
|
||||
# Verify the parsed number is actually an activity_id we know about.
|
||||
# The Garmin Takeout dump uses *upload IDs* in FIT filenames (different ID space
|
||||
# from activity IDs), so naive insert would create thousands of orphaned rows.
|
||||
# If the id isn't in `activities`, skip — link_fit_files.py handles the
|
||||
# by-content matching for takeout-format exports.
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM activities WHERE activity_id = ? LIMIT 1", (aid,)
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return 0
|
||||
rel = path.relative_to(export_root)
|
||||
conn.execute(
|
||||
"""INSERT OR REPLACE INTO activity_fit_files (activity_id, fit_path, indexed_at)
|
||||
VALUES (?, ?, datetime('now'))""",
|
||||
(aid, str(rel)),
|
||||
)
|
||||
conn.commit()
|
||||
return 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatch — pattern → handler
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Handler = Callable[[sqlite3.Connection, Path], int]
|
||||
|
||||
# Order matters — first match wins. Use lowercased filename substrings.
|
||||
DISPATCH: list[tuple[str, str, Handler]] = [
|
||||
("summarizedActivities", "activities", handle_activities),
|
||||
("sleepData", "sleep", handle_sleep),
|
||||
("sleep", "sleep", handle_sleep),
|
||||
("UDSFile", "steps", handle_steps), # daily steps file naming varies
|
||||
("step", "steps", handle_steps),
|
||||
("stressLevel", "stress", handle_stress),
|
||||
("stress", "stress", handle_stress),
|
||||
("hrvStatus", "hrv", handle_hrv),
|
||||
("hrv", "hrv", handle_hrv),
|
||||
("restingHeart", "resting_hr", handle_resting_hr),
|
||||
("RHR", "resting_hr", handle_resting_hr),
|
||||
("intensityMinute", "intensity", handle_intensity_minutes),
|
||||
("bodyBattery", "body_batt", handle_body_battery),
|
||||
("BBS", "body_batt", handle_body_battery),
|
||||
]
|
||||
|
||||
|
||||
def classify(name: str) -> tuple[str, Handler] | None:
|
||||
lower = name.lower()
|
||||
for needle, label, fn in DISPATCH:
|
||||
if needle.lower() in lower:
|
||||
return label, fn
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# entry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def iter_files(root: Path) -> Iterable[Path]:
|
||||
for p in root.rglob("*"):
|
||||
if p.is_file():
|
||||
yield p
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("source", help="Path to export.zip or unzipped export directory")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Show what would be ingested without writing to the DB")
|
||||
args = parser.parse_args()
|
||||
|
||||
src = Path(args.source).expanduser().resolve()
|
||||
if not src.exists():
|
||||
sys.exit(f"path does not exist: {src}")
|
||||
|
||||
cleanup_dir: tempfile.TemporaryDirectory | None = None
|
||||
if src.is_file() and src.suffix.lower() == ".zip":
|
||||
cleanup_dir = tempfile.TemporaryDirectory(prefix="garmin_export_")
|
||||
export_root = Path(cleanup_dir.name)
|
||||
print(f"unzipping {src.name} → {export_root}")
|
||||
with zipfile.ZipFile(src) as zf:
|
||||
zf.extractall(export_root)
|
||||
elif src.is_dir():
|
||||
export_root = src
|
||||
else:
|
||||
sys.exit(f"unsupported source: {src}")
|
||||
|
||||
conn = connect()
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
unknown: list[str] = []
|
||||
fit_count = 0
|
||||
|
||||
for path in iter_files(export_root):
|
||||
if path.suffix.lower() == ".fit":
|
||||
if not args.dry_run:
|
||||
fit_count += handle_fit(conn, path, export_root)
|
||||
else:
|
||||
fit_count += 1
|
||||
continue
|
||||
if path.suffix.lower() != ".json":
|
||||
continue
|
||||
|
||||
matched = classify(path.name)
|
||||
if not matched:
|
||||
unknown.append(str(path.relative_to(export_root)))
|
||||
continue
|
||||
|
||||
label, fn = matched
|
||||
if args.dry_run:
|
||||
counts[label] += 1
|
||||
continue
|
||||
try:
|
||||
counts[label] += fn(conn, path)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f" ! error in {label} handler for {path.name}: {exc}", file=sys.stderr)
|
||||
|
||||
print("\n=== ingest summary ===")
|
||||
for label, n in sorted(counts.items()):
|
||||
print(f" {label:20s} {n:>6} rows" + (" (file count, dry run)" if args.dry_run else ""))
|
||||
print(f" fit_files {fit_count:>6}")
|
||||
if unknown:
|
||||
print(f"\n unrecognized JSON files ({len(unknown)}):")
|
||||
for name in unknown[:25]:
|
||||
print(f" {name}")
|
||||
if len(unknown) > 25:
|
||||
print(f" ... and {len(unknown) - 25} more")
|
||||
|
||||
if not args.dry_run:
|
||||
set_state(conn, "last_ingest_utc", datetime.utcnow().isoformat(timespec="seconds"))
|
||||
set_state(conn, "last_ingest_source", str(src))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
if cleanup_dir:
|
||||
cleanup_dir.cleanup()
|
||||
print("\n✓ done")
|
||||
|
||||
"""Shim — see openrun.ingest.garmin_export."""
|
||||
from openrun.ingest.garmin_export import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user