Files
openrun/analysis.py
2026-05-18 12:53:24 -04:00

598 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Shared loaders + derived columns for Garmin analysis notebooks.
Usage in a notebook:
from analysis import open_conn, load_activities, load_wellness
conn = open_conn()
runs = load_activities(conn, type='running')
wellness = load_wellness(conn)
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
import pandas as pd
DB_PATH = Path(__file__).parent / "data" / "garmin.db"
def open_conn() -> sqlite3.Connection:
return sqlite3.connect(DB_PATH)
# ---------------------------------------------------------------------------
# activities
# ---------------------------------------------------------------------------
def load_activities(conn: sqlite3.Connection, *, type: str | None = None) -> pd.DataFrame:
"""Activities with derived: distance_km, duration_min, pace_min_per_km, week, month, year."""
sql = """
SELECT activity_id, start_time_local, activity_type, activity_name,
distance_m, duration_s, moving_duration_s,
avg_speed_mps, max_speed_mps, avg_hr, max_hr, calories,
elevation_gain_m, elevation_loss_m,
training_load, aerobic_te, anaerobic_te, vo2_max
FROM activities
"""
if type:
sql += " WHERE activity_type = ?"
df = pd.read_sql(sql, conn, params=[type], parse_dates=["start_time_local"])
else:
df = pd.read_sql(sql, conn, parse_dates=["start_time_local"])
df["distance_km"] = df["distance_m"] / 1000
df["duration_min"] = df["duration_s"] / 60
df["moving_min"] = df["moving_duration_s"] / 60
df["pace_min_per_km"] = df["moving_min"] / df["distance_km"]
# Filter physically-impossible paces (sub-3 min/km is faster than world-record marathon pace)
# and walks-with-stops (>30 min/km).
impossible = (df["distance_km"] < 0.2) | (df["pace_min_per_km"] < 3) | (df["pace_min_per_km"] > 30)
df.loc[impossible, "pace_min_per_km"] = pd.NA
df["date"] = df["start_time_local"].dt.normalize()
df["week"] = df["start_time_local"].dt.to_period("W").dt.start_time
df["month"] = df["start_time_local"].dt.to_period("M").dt.to_timestamp()
df["year"] = df["start_time_local"].dt.year
return df.sort_values("start_time_local").reset_index(drop=True)
# ---------------------------------------------------------------------------
# wellness
# ---------------------------------------------------------------------------
def load_wellness(conn: sqlite3.Connection) -> pd.DataFrame:
"""Joined daily wellness frame indexed by calendar_date (datetime)."""
df = pd.read_sql(
"""
SELECT s.calendar_date,
s.total_steps,
sl.sleep_score,
sl.deep_s, sl.light_s, sl.rem_s, sl.awake_s,
st.avg_stress,
h.last_night_avg AS hrv_last_night,
h.weekly_avg AS hrv_weekly,
h.status AS hrv_status,
im.moderate_minutes,
im.vigorous_minutes,
rh.resting_hr,
bb.charged AS bb_charged,
bb.drained AS bb_drained,
bb.highest AS bb_highest,
bb.lowest AS bb_lowest
FROM daily_steps s
LEFT JOIN daily_sleep sl ON sl.calendar_date = s.calendar_date
LEFT JOIN daily_stress st ON st.calendar_date = s.calendar_date
LEFT JOIN daily_hrv h ON h.calendar_date = s.calendar_date
LEFT JOIN daily_intensity_minutes im ON im.calendar_date = s.calendar_date
LEFT JOIN daily_resting_hr rh ON rh.calendar_date = s.calendar_date
LEFT JOIN daily_body_battery bb ON bb.calendar_date = s.calendar_date
ORDER BY s.calendar_date
""",
conn,
parse_dates=["calendar_date"],
).set_index("calendar_date")
df["sleep_total_s"] = df[["deep_s", "light_s", "rem_s"]].sum(axis=1, min_count=1)
df["sleep_hours"] = df["sleep_total_s"] / 3600
df["deep_pct"] = df["deep_s"] / df["sleep_total_s"]
df["rem_pct"] = df["rem_s"] / df["sleep_total_s"]
return df
# ---------------------------------------------------------------------------
# combine: training load by day, joined with next-day wellness
# ---------------------------------------------------------------------------
def daily_training_load(conn: sqlite3.Connection) -> pd.DataFrame:
"""Sum training load + distance per calendar date (any activity type)."""
acts = load_activities(conn)
daily = (
acts.groupby("date")
.agg(
training_load=("training_load", "sum"),
distance_km=("distance_km", "sum"),
duration_min=("duration_min", "sum"),
n_activities=("activity_id", "count"),
avg_hr_weighted=("avg_hr", "mean"), # simple unweighted; refine if needed
)
)
daily.index = pd.to_datetime(daily.index)
return daily
def joined(conn: sqlite3.Connection) -> pd.DataFrame:
"""Wellness joined with same-day and previous-day training load."""
wellness = load_wellness(conn)
tl = daily_training_load(conn)
df = wellness.join(tl, how="left")
df[["training_load", "distance_km", "duration_min", "n_activities"]] = (
df[["training_load", "distance_km", "duration_min", "n_activities"]].fillna(0)
)
# previous day training load (commonly correlated with overnight HRV / next-morning RHR)
df["training_load_prev"] = df["training_load"].shift(1)
df["distance_km_prev"] = df["distance_km"].shift(1)
return df
# ---------------------------------------------------------------------------
# expand the raw JSON of a table when you want fields the schema doesn't surface
# ---------------------------------------------------------------------------
def expand_raw(df: pd.DataFrame, raw_col: str = "raw") -> pd.DataFrame:
"""For a frame with a `raw` JSON column, return a normalized companion frame."""
if raw_col not in df.columns:
raise KeyError(f"no '{raw_col}' column in frame")
return pd.json_normalize([json.loads(r) for r in df[raw_col]])
# ---------------------------------------------------------------------------
# splits — per-lap data with cadence, stride, GPS, etc. extracted from raw JSON
# ---------------------------------------------------------------------------
_SPLIT_RAW_FIELDS = (
"averageRunCadence",
"maxRunCadence",
"strideLength",
"verticalOscillation",
"verticalRatio",
"groundContactTime",
"averagePower",
"normalizedPower",
"startLatitude",
"startLongitude",
"endLatitude",
"endLongitude",
"avgGradeAdjustedSpeed",
"maxHR",
"elevationGain",
"elevationLoss",
)
def load_splits(conn: sqlite3.Connection, *, activity_type: str | None = "running") -> pd.DataFrame:
"""Per-split frame with rich fields expanded from raw JSON, joined to activity start time.
Derived columns:
pace_min_per_km, pace_min_per_mile, speed_kmh, split_seq (0-based position in run),
n_splits (total in that run), frac_through (0..1), year, month.
Splits with implausible values (no HR, distance < 200m, pace > 30 min/km) are dropped.
"""
sql = """
SELECT s.activity_id, s.split_index, s.distance_m, s.duration_s,
s.avg_hr, s.avg_speed_mps, s.elevation_gain_m AS split_elev_gain_m,
s.raw, a.start_time_local, a.activity_type
FROM activity_splits s
JOIN activities a ON a.activity_id = s.activity_id
"""
params: list = []
if activity_type:
sql += " WHERE a.activity_type = ?"
params.append(activity_type)
sql += " ORDER BY s.activity_id, s.split_index"
df = pd.read_sql(sql, conn, params=params, parse_dates=["start_time_local"])
raws = [json.loads(r) if r else {} for r in df["raw"]]
for k in _SPLIT_RAW_FIELDS:
df[k] = [r.get(k) for r in raws]
df = df.drop(columns=["raw"])
df["pace_min_per_km"] = (df["duration_s"] / 60) / (df["distance_m"] / 1000)
df["pace_min_per_mile"] = (df["duration_s"] / 60) / (df["distance_m"] / 1609.344)
df["speed_kmh"] = df["avg_speed_mps"] * 3.6
bad = (
df["distance_m"].lt(200)
| df["avg_hr"].isna()
| df["avg_hr"].lt(60)
| df["pace_min_per_km"].gt(30)
| df["pace_min_per_km"].lt(2.5)
)
df = df.loc[~bad].copy()
df["split_seq"] = df.groupby("activity_id").cumcount()
df["n_splits"] = df.groupby("activity_id")["activity_id"].transform("count")
denom = (df["n_splits"] - 1).replace(0, pd.NA)
df["frac_through"] = df["split_seq"] / denom
df["year"] = df["start_time_local"].dt.year
df["month"] = df["start_time_local"].dt.to_period("M").dt.to_timestamp()
return df.reset_index(drop=True)
def decoupling(splits: pd.DataFrame, min_splits: int = 6) -> pd.DataFrame:
"""Per-activity Pa:Hr decoupling using duration-weighted halves.
`efficiency` per half = mean(speed_mps weighted by duration) / mean(HR weighted by duration).
`decoupling_pct` = (first_half_eff / second_half_eff - 1) * 100.
Positive = pace/HR dropped in 2nd half (the textbook 'cardiac drift' direction).
Negative = ran faster per beat in 2nd half (often: negative split, conservative start).
Endurance benchmark: <5% on a steady aerobic run is 'aerobically developed'.
"""
valid = splits[splits["n_splits"] >= min_splits].copy()
valid["half"] = (valid["frac_through"] >= 0.5).map({False: "first", True: "second"})
def _half_eff(d: pd.DataFrame) -> float:
w = d["duration_s"].to_numpy()
speed = (d["avg_speed_mps"].to_numpy() * w).sum() / w.sum()
hr = (d["avg_hr"].to_numpy() * w).sum() / w.sum()
return speed / hr if hr else float("nan")
eff = (
valid.groupby(["activity_id", "half"])[["avg_speed_mps", "avg_hr", "duration_s"]]
.apply(_half_eff)
.unstack("half")
)
eff["decoupling_pct"] = (eff["first"] / eff["second"] - 1) * 100
eff = eff.dropna(subset=["decoupling_pct"])
meta = valid.groupby("activity_id").agg(
start_time_local=("start_time_local", "first"),
distance_km=("distance_m", lambda s: s.sum() / 1000),
duration_min=("duration_s", lambda s: s.sum() / 60),
avg_hr=("avg_hr", "mean"),
avg_pace_min_per_km=("pace_min_per_km", "mean"),
n_splits=("n_splits", "first"),
)
out = eff.join(meta).reset_index()
out["year"] = out["start_time_local"].dt.year
return out
_HR_ZONE_BOUNDS = (0.50, 0.60, 0.70, 0.80, 0.90, 1.01)
_HR_ZONE_LABELS = ("Z1", "Z2", "Z3", "Z4", "Z5")
def assign_hr_zone(hr: float, hr_max: float) -> str | None:
if hr is None or pd.isna(hr) or not hr_max:
return None
frac = hr / hr_max
for lo, hi, lab in zip(_HR_ZONE_BOUNDS[:-1], _HR_ZONE_BOUNDS[1:], _HR_ZONE_LABELS):
if lo <= frac < hi:
return lab
return "Z5" if frac >= _HR_ZONE_BOUNDS[-2] else "Z1"
# Garmin-configured HR zones for this user (source: DI_CONNECT/.../heartRateZones.json).
# trainingMethod=HR_MAX, maxHeartRateUsed=209, lactateThresholdHR=182, RHR=52.
HR_ZONES_USER: tuple[tuple[str, int, int], ...] = (
("Z1", 102, 122), # recovery
("Z2", 123, 143), # easy aerobic — long-run target
("Z3", 144, 164), # tempo / "junk-miles middle"
("Z4", 165, 185), # threshold (LTHR sits inside Z4 at 182)
("Z5", 186, 209), # VO2 max
)
def hr_to_user_zone(hr: float, zones: tuple[tuple[str, int, int], ...] = HR_ZONES_USER) -> str | None:
"""Map a single HR reading to its configured zone label (Z1..Z5).
Below Z1 floor → None (warmup / walking).
Above Z5 ceiling → still Z5 (rare, edge of effort).
"""
if hr is None or pd.isna(hr):
return None
if hr < zones[0][1]: # below Z1 lower bound
return None
for label, lo, hi in zones:
if lo <= hr <= hi:
return label
return zones[-1][0] # above Z5 ceiling
def time_in_zone_from_fit(records: pd.DataFrame,
zones: tuple[tuple[str, int, int], ...] = HR_ZONES_USER) -> dict[str, float]:
"""Per-second time-in-zone (seconds) from a FIT records frame.
Each record contributes `elapsed_s - prev_elapsed_s` to whichever zone its HR
falls in. Large gaps (>30 s, e.g. a paused recording) are clipped to 30 s
so a stopped watch doesn't dump hours into one zone.
"""
if records is None or records.empty or "heart_rate" not in records:
return {}
r = records.dropna(subset=["heart_rate", "elapsed_s"]).copy()
if r.empty:
return {}
r["dt"] = r["elapsed_s"].diff().fillna(1.0).clip(lower=0, upper=30.0)
r["zone"] = r["heart_rate"].apply(lambda h: hr_to_user_zone(h, zones))
return r.dropna(subset=["zone"]).groupby("zone")["dt"].sum().to_dict()
def time_in_zone_from_splits(splits_df: pd.DataFrame,
zones: tuple[tuple[str, int, int], ...] = HR_ZONES_USER) -> dict[str, float]:
"""Fallback when there's no FIT — assign each split's avg HR to one zone.
Coarser than the per-second method: a split with avg HR 155 contributes its
entire duration to Z3, even if it was actually 1 min Z2 + 3 min Z3 + 1 min Z4.
"""
if splits_df is None or splits_df.empty:
return {}
s = splits_df.dropna(subset=["avg_hr", "duration_s"]).copy()
s["zone"] = s["avg_hr"].apply(lambda h: hr_to_user_zone(h, zones))
return s.dropna(subset=["zone"]).groupby("zone")["duration_s"].sum().to_dict()
def haversine_km(lat1, lon1, lat2, lon2):
"""Vectorised great-circle distance, kilometres. Inputs in degrees."""
import numpy as np
r = 6371.0
lat1, lon1, lat2, lon2 = map(np.radians, (lat1, lon1, lat2, lon2))
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
return 2 * r * np.arcsin(np.sqrt(a))
def banister(
daily_load: pd.Series,
*,
ctl_tau: float = 42.0,
atl_tau: float = 7.0,
start_date: str | pd.Timestamp | None = None,
end_date: str | pd.Timestamp | None = None,
) -> pd.DataFrame:
"""Banister fitness/fatigue/form (CTL/ATL/TSB) from a daily training-load series.
`daily_load` should be a Series indexed by date (one value per day, 0 for rest days).
Missing dates inside the range are filled with 0 — a rest day still updates both
EWMAs (CTL drifts down slowly, ATL drifts down fast → TSB recovers).
Returns a frame indexed by date with columns CTL, ATL, TSB.
Conventions (per Coggan / TrainingPeaks):
CTL_today = CTL_yesterday · exp(1/τ_CTL) + load_today · (1 exp(1/τ_CTL))
ATL_today = ATL_yesterday · exp(1/τ_ATL) + load_today · (1 exp(1/τ_ATL))
TSB_today = CTL_yesterday ATL_yesterday # *yesterday's* values
TSB interpretation:
< 30 severely fatigued (injury risk)
10 to 30 productive overload, the heart of a build block
10 to 0 balanced building
0 to +10 sharpening
+10 to +25 fresh / peaked ← race-day target
> +25 detrained (taper too long)
"""
import numpy as np
if daily_load.empty:
return pd.DataFrame(columns=["CTL", "ATL", "TSB"])
idx = pd.to_datetime(daily_load.index).normalize()
s = pd.Series(daily_load.values, index=idx).groupby(level=0).sum()
lo = pd.Timestamp(start_date) if start_date else s.index.min()
hi = pd.Timestamp(end_date) if end_date else s.index.max()
full = s.reindex(pd.date_range(lo, hi, freq="D"), fill_value=0.0)
decay_ctl, decay_atl = np.exp(-1 / ctl_tau), np.exp(-1 / atl_tau)
w_ctl, w_atl = 1 - decay_ctl, 1 - decay_atl
n = len(full)
ctl = np.zeros(n)
atl = np.zeros(n)
loads = full.to_numpy()
for i in range(n):
prev_ctl = ctl[i - 1] if i else 0.0
prev_atl = atl[i - 1] if i else 0.0
ctl[i] = prev_ctl * decay_ctl + loads[i] * w_ctl
atl[i] = prev_atl * decay_atl + loads[i] * w_atl
out = pd.DataFrame({"CTL": ctl, "ATL": atl}, index=full.index)
out["TSB"] = out["CTL"].shift(1) - out["ATL"].shift(1)
return out
def daily_training_load_series(
conn: sqlite3.Connection,
*,
activity_types: tuple[str, ...] = ("running", "trail_running"),
) -> pd.Series:
"""Daily-summed training_load across the given activity types, in ascending date order."""
placeholders = ",".join(["?"] * len(activity_types))
df = pd.read_sql(
f"""SELECT date(start_time_local) AS d, SUM(training_load) AS tl
FROM activities
WHERE activity_type IN ({placeholders}) AND training_load IS NOT NULL
GROUP BY d ORDER BY d""",
conn,
params=list(activity_types),
parse_dates=["d"],
)
return df.set_index("d")["tl"]
def _resolve_fit_path(rel_path: str) -> Path:
"""Find a FIT file on disk. `fit_path` in the DB is stored relative to the
export root that was passed to `link_fit_files.py`. We don't know which
top-level folder under the project that was, so try each."""
project_root = Path(__file__).parent
for entry in project_root.iterdir():
if entry.is_dir():
candidate = entry / rel_path
if candidate.exists():
return candidate
# Maybe the path is already absolute or relative to cwd
p = Path(rel_path)
if p.exists():
return p
raise FileNotFoundError(f"could not locate FIT file: {rel_path}")
def load_fit_records(conn: sqlite3.Connection, activity_id: int) -> pd.DataFrame:
"""Per-second FIT `record` messages for one activity as a DataFrame.
Columns (subset of what's present):
timestamp (UTC, tz-aware), elapsed_s, heart_rate, speed_mps, distance_m,
cadence_spm (both legs), altitude_m, power_w, position_lat_deg,
position_long_deg, vertical_oscillation_mm, step_length_mm.
Raises if no FIT is linked for the activity.
"""
import fitparse # heavy-ish import; keep lazy
row = conn.execute(
"SELECT fit_path FROM activity_fit_files WHERE activity_id = ?", (activity_id,)
).fetchone()
if row is None:
raise ValueError(f"no FIT linked for activity {activity_id}")
fit_file = _resolve_fit_path(row[0])
fit = fitparse.FitFile(str(fit_file))
rows: list[dict] = []
for msg in fit.get_messages("record"):
rows.append(msg.get_values())
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
# Normalise & rename
out = pd.DataFrame()
if "timestamp" in df.columns:
out["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
out["elapsed_s"] = (out["timestamp"] - out["timestamp"].iloc[0]).dt.total_seconds()
out["heart_rate"] = df.get("heart_rate")
# Prefer enhanced_speed (always m/s) over the legacy `speed` field
out["speed_mps"] = df.get("enhanced_speed", df.get("speed"))
out["distance_m"] = df.get("distance")
# `cadence` is already both-legs SPM in this account's exports;
# fractional_cadence is a 01 fractional adjustment, ignored.
out["cadence_spm"] = df.get("cadence")
out["altitude_m"] = df.get("enhanced_altitude", df.get("altitude"))
out["power_w"] = df.get("power")
out["vertical_oscillation_mm"] = df.get("vertical_oscillation")
out["step_length_mm"] = df.get("step_length")
# Position: semicircles → degrees
SEMI = 180.0 / (2 ** 31)
if "position_lat" in df.columns:
out["position_lat_deg"] = df["position_lat"] * SEMI
if "position_long" in df.columns:
out["position_long_deg"] = df["position_long"] * SEMI
return out
def fit_decoupling(
records: pd.DataFrame,
*,
segments: int = 2,
warmup_min: float = 5.0,
cooldown_min: float = 2.0,
min_speed_mps: float = 0.5,
) -> pd.DataFrame:
"""Per-second Pa:Hr decoupling — Friel's method, faithful to the literature.
Steps:
1. Drop the first `warmup_min` and last `cooldown_min` of the run.
2. Drop "stopped" records (speed below `min_speed_mps`) so aid-station
pauses don't drag mean speed down.
3. Slice the remaining moving time into `segments` equal-time chunks.
4. For each chunk: `efficiency = mean(speed_mps) / mean(heart_rate)`.
5. decoupling % = (segment_i / segment_0 1) × 100. Negative ⇒ pace/HR
improved (negative split). Positive ⇒ cardiac drift.
Returns one row per segment.
"""
r = records.dropna(subset=["heart_rate", "speed_mps", "elapsed_s"]).copy()
if r.empty:
return pd.DataFrame()
total = r["elapsed_s"].iloc[-1]
r = r[(r["elapsed_s"] >= warmup_min * 60) & (r["elapsed_s"] <= total - cooldown_min * 60)]
moving = r[r["speed_mps"] >= min_speed_mps].copy()
if moving.empty:
return pd.DataFrame()
moving = moving.reset_index(drop=True)
seg_size = len(moving) // segments
out_rows: list[dict] = []
for i in range(segments):
s = i * seg_size
e = (i + 1) * seg_size if i < segments - 1 else len(moving)
chunk = moving.iloc[s:e]
speed = chunk["speed_mps"].mean()
hr = chunk["heart_rate"].mean()
out_rows.append(
{
"segment": i + 1,
"from_min": chunk["elapsed_s"].iloc[0] / 60,
"to_min": chunk["elapsed_s"].iloc[-1] / 60,
"mean_speed_mps": speed,
"mean_pace_min_per_km": (1 / speed) * 1000 / 60 if speed else float("nan"),
"mean_hr": hr,
"efficiency": speed / hr if hr else float("nan"),
}
)
out = pd.DataFrame(out_rows)
base = out["efficiency"].iloc[0]
out["decoupling_pct"] = (base / out["efficiency"] - 1) * 100
return out
def fit_rolling_efficiency(records: pd.DataFrame, window_s: int = 300) -> pd.DataFrame:
"""Rolling mean speed/HR (efficiency) and its derived rolling pace + HR.
Useful for plotting when efficiency declines through a race. `window_s`
defaults to 5 minutes — long enough to smooth GPS/HR jitter but short
enough to see drift in the second half.
"""
r = records.dropna(subset=["heart_rate", "speed_mps", "elapsed_s"]).copy()
if r.empty:
return r
r = r.set_index("elapsed_s")
win = f"{window_s}s"
# Rolling needs a DatetimeIndex; build a synthetic one from elapsed_s.
r["_ts"] = pd.to_datetime(r.index, unit="s")
r = r.set_index("_ts")
rolled = pd.DataFrame(index=r.index)
rolled["rolling_speed_mps"] = r["speed_mps"].rolling(win).mean()
rolled["rolling_hr"] = r["heart_rate"].rolling(win).mean()
rolled["rolling_efficiency"] = rolled["rolling_speed_mps"] / rolled["rolling_hr"]
rolled["elapsed_min"] = (rolled.index - rolled.index[0]).total_seconds() / 60
return rolled.reset_index(drop=True)
def cluster_routes(lats, lons, radius_km: float = 0.25):
"""Greedy haversine-radius clustering of run start points.
Assigns each point to the cluster of the first unassigned point within `radius_km`.
Returns an integer label array; -1 means unclustered (no neighbours).
Good enough for a few hundred runs; for thousands, switch to sklearn DBSCAN with metric='haversine'.
"""
import numpy as np
lats = np.asarray(lats, dtype=float)
lons = np.asarray(lons, dtype=float)
n = len(lats)
labels = np.full(n, -1, dtype=int)
next_label = 0
for i in range(n):
if labels[i] != -1:
continue
d = haversine_km(lats[i], lons[i], lats, lons)
neigh = np.where((d <= radius_km) & (labels == -1))[0]
# Require at least 2 runs to count as a "route"; singletons stay -1.
if len(neigh) >= 2:
labels[neigh] = next_label
next_label += 1
return labels