This commit is contained in:
2026-06-12 05:48:30 -04:00
parent 9d91ac8ebc
commit 64a5ab4b7f
37 changed files with 4530 additions and 407 deletions

View File

@@ -0,0 +1,92 @@
"""Integration: manual_activities flow through into the Banister PMC.
The contract is that `daily_training_load_series(..., include_manual=True)`
unions manual rows into the same Series the PMC consumes, so adding a
strength session lifts CTL on that day and the days following — same
recursion, no special case.
"""
from __future__ import annotations
import pandas as pd
from openrun.model import banister, daily_training_load_series
def _add_activity(conn, aid: int, when: str, atype: str, training_load: float) -> None:
conn.execute(
"""INSERT INTO activities
(activity_id, start_time_local, activity_type, training_load, raw, fetched_at)
VALUES (?, ?, ?, ?, '{}', 'now')""",
(aid, when, atype, training_load),
)
def _add_manual(conn, when: str, atype: str, training_load: float) -> None:
conn.execute(
"""INSERT INTO manual_activities
(activity_date, activity_type, training_load, source, imported_at)
VALUES (?, ?, ?, 'test', datetime('now'))""",
(when, atype, training_load),
)
def test_include_manual_increases_daily_sum(tmp_conn) -> None:
_add_activity(tmp_conn, aid=1, when="2026-05-01 06:00:00", atype="running", training_load=50.0)
_add_manual(tmp_conn, when="2026-05-01", atype="running", training_load=30.0)
tmp_conn.commit()
without = daily_training_load_series(tmp_conn)
with_manual = daily_training_load_series(tmp_conn, include_manual=True)
assert without.iloc[0] == 50.0
assert with_manual.iloc[0] == 80.0
def test_manual_only_day_appears_when_included(tmp_conn) -> None:
"""A day with only a manual row is invisible without the flag and appears with it."""
_add_manual(tmp_conn, when="2026-05-02", atype="running", training_load=25.0)
tmp_conn.commit()
without = daily_training_load_series(tmp_conn)
with_manual = daily_training_load_series(tmp_conn, include_manual=True)
assert without.empty
assert with_manual.loc[pd.Timestamp("2026-05-02")] == 25.0
def test_manual_load_lifts_banister_ctl_monotonically(tmp_conn) -> None:
"""For two otherwise-identical worlds, the one with extra manual TL must
show strictly higher CTL on the day the manual session was logged and on
every following day (the EWMA carries forward)."""
_add_activity(tmp_conn, aid=1, when="2026-05-01 06:00:00", atype="running", training_load=50.0)
_add_activity(tmp_conn, aid=2, when="2026-05-03 06:00:00", atype="running", training_load=60.0)
tmp_conn.commit()
base = banister(daily_training_load_series(tmp_conn))
_add_manual(tmp_conn, when="2026-05-02", atype="running", training_load=40.0)
tmp_conn.commit()
with_manual = banister(daily_training_load_series(tmp_conn, include_manual=True))
# 2026-05-01 is unchanged (manual is in the future at that point).
assert with_manual.loc["2026-05-01", "CTL"] == base.loc["2026-05-01", "CTL"]
# 2026-05-02 onward is strictly larger.
for d in ("2026-05-02", "2026-05-03"):
assert with_manual.loc[d, "CTL"] > base.loc[d, "CTL"]
def test_activity_type_filter_excludes_other_manual_rows(tmp_conn) -> None:
"""Manual rows whose type is not in activity_types stay out of the series."""
_add_manual(tmp_conn, when="2026-05-02", atype="strength", training_load=100.0)
tmp_conn.commit()
out = daily_training_load_series(tmp_conn, include_manual=True)
assert out.empty
def test_manual_same_day_as_garmin_row_sums(tmp_conn) -> None:
"""A manual row and a Garmin row on the same day must combine, not overwrite."""
_add_activity(tmp_conn, aid=1, when="2026-05-01 18:00:00", atype="running", training_load=70.0)
_add_manual(tmp_conn, when="2026-05-01", atype="running", training_load=30.0)
tmp_conn.commit()
s = daily_training_load_series(tmp_conn, include_manual=True)
assert s.loc[pd.Timestamp("2026-05-01")] == 100.0

97
tests/unit/test_auth.py Normal file
View File

@@ -0,0 +1,97 @@
"""Tests for `openrun.ingest.auth` — the web-friendly, MFA-resumable login flow.
We stub garth at the `sso.login` / `sso.resume_login` / `save` boundary so the
two-step (password → MFA code) state machine is testable without a real Garmin
account or network.
"""
from __future__ import annotations
from pathlib import Path
import openrun.ingest.auth as auth
class _FakeClient:
username = "athlete"
oauth1_token = None
oauth2_token = None
def _stub_garth(monkeypatch):
client = _FakeClient()
monkeypatch.setattr(auth.garth, "client", client)
monkeypatch.setattr(auth.garth, "save", lambda td: None)
return client
# --- token store helpers ---------------------------------------------------
def test_has_tokens(tmp_path: Path) -> None:
assert auth.has_tokens(tmp_path) is False
(tmp_path / "oauth1_token.json").write_text("{}")
assert auth.has_tokens(tmp_path) is True
def test_resume_no_tokens_is_false(tmp_path: Path, monkeypatch) -> None:
called = {"n": 0}
monkeypatch.setattr(auth.garth, "resume", lambda td: called.__setitem__("n", called["n"] + 1))
assert auth.resume(tmp_path) is False
assert called["n"] == 0 # short-circuits before touching garth
def test_resume_with_tokens(tmp_path: Path, monkeypatch) -> None:
(tmp_path / "oauth1_token.json").write_text("{}")
monkeypatch.setattr(auth.garth, "resume", lambda td: None)
assert auth.resume(tmp_path) is True
def test_resume_swallows_bad_tokens(tmp_path: Path, monkeypatch) -> None:
(tmp_path / "oauth1_token.json").write_text("garbage")
def _boom(td):
raise ValueError("corrupt")
monkeypatch.setattr(auth.garth, "resume", _boom)
assert auth.resume(tmp_path) is False
# --- login state machine ---------------------------------------------------
def test_begin_login_no_mfa(tmp_path: Path, monkeypatch) -> None:
client = _stub_garth(monkeypatch)
monkeypatch.setattr(auth.garth.sso, "login", lambda *a, **k: ("oauth1", "oauth2"))
kind, payload = auth.begin_login("e@x.com", "pw", tmp_path)
assert kind == "ok"
assert payload == "athlete"
assert client.oauth1_token == "oauth1" and client.oauth2_token == "oauth2"
def test_begin_login_needs_mfa(tmp_path: Path, monkeypatch) -> None:
_stub_garth(monkeypatch)
state = {"session": "opaque"}
monkeypatch.setattr(auth.garth.sso, "login", lambda *a, **k: ("needs_mfa", state))
kind, payload = auth.begin_login("e@x.com", "pw", tmp_path)
assert kind == "needs_mfa"
assert payload is state
def test_complete_mfa(tmp_path: Path, monkeypatch) -> None:
client = _stub_garth(monkeypatch)
seen = {}
def _resume(state, code):
seen["state"], seen["code"] = state, code
return ("o1", "o2")
monkeypatch.setattr(auth.garth.sso, "resume_login", _resume)
user = auth.complete_mfa({"s": 1}, " 123456 ", tmp_path)
assert user == "athlete"
assert seen["code"] == "123456" # trimmed
assert client.oauth1_token == "o1" and client.oauth2_token == "o2"

View File

@@ -0,0 +1,105 @@
"""Tests for `write_config` — TOML serialiser used by the first-run wizard.
The load-bearing invariant: write_config → load_config produces an equivalent
Config. If that holds, the wizard can write whatever the user picked and
trust the rest of the codebase to read it back the same way.
"""
from __future__ import annotations
from pathlib import Path
from openrun.config import (
BanisterParams,
Config,
HRZones,
UserProfile,
load_config,
write_config,
)
def _sample_config(tmp_path: Path) -> Config:
return Config(
user=UserProfile(
name="Test Runner",
height_cm=180.0,
weight_kg=72.5,
hr_max=200,
lthr=178,
resting_hr=48,
zones=HRZones(
z1=(100, 120), z2=(121, 140), z3=(141, 160),
z4=(161, 180), z5=(181, 200),
),
),
db_path=tmp_path / "data" / "garmin.db",
banister=BanisterParams(ctl_tau_days=42.0, atl_tau_days=7.0, race_day_tl_per_km=7.0),
races=(("wk 4 — 30K", "2026-06-13"), ("wk 17 — 50 mile", "2026-09-12")),
)
def test_write_then_load_roundtrip(tmp_path: Path) -> None:
cfg = _sample_config(tmp_path)
out = tmp_path / "openrun.toml"
write_config(cfg, out)
loaded = load_config(out)
assert loaded.user.name == cfg.user.name
assert loaded.user.hr_max == cfg.user.hr_max
assert loaded.user.lthr == cfg.user.lthr
assert loaded.user.resting_hr == cfg.user.resting_hr
assert loaded.user.weight_kg == cfg.user.weight_kg
assert loaded.user.height_cm == cfg.user.height_cm
assert loaded.user.zones == cfg.user.zones
assert loaded.banister == cfg.banister
assert loaded.races == cfg.races
def test_write_handles_missing_optional_fields(tmp_path: Path) -> None:
"""Height/weight/lthr/rhr are optional — omitted entirely from output."""
cfg = Config(
user=UserProfile(name="Minimal", hr_max=190),
db_path=tmp_path / "minimal.db",
)
out = tmp_path / "openrun.toml"
write_config(cfg, out)
body = out.read_text()
assert "height_cm" not in body
assert "weight_kg" not in body
assert "lthr" not in body
assert "resting_hr" not in body
loaded = load_config(out)
assert loaded.user.hr_max == 190
assert loaded.user.height_cm is None
def test_write_quotes_strings_with_special_chars(tmp_path: Path) -> None:
"""Race labels and athlete names commonly contain Unicode em-dashes and quotes;
the serialiser must escape these so the file parses back."""
cfg = Config(
user=UserProfile(name='O\'Brien — "Speedy"', hr_max=200),
db_path=tmp_path / "x.db",
races=(('wk 17 — Cascade Crest "50M"', "2026-09-12"),),
)
out = tmp_path / "openrun.toml"
write_config(cfg, out)
loaded = load_config(out)
assert loaded.user.name == 'O\'Brien — "Speedy"'
assert loaded.races == (('wk 17 — Cascade Crest "50M"', "2026-09-12"),)
def test_write_creates_parent_dir(tmp_path: Path) -> None:
cfg = Config(user=UserProfile(name="x", hr_max=200), db_path=tmp_path / "x.db")
nested = tmp_path / "a" / "b" / "openrun.toml"
write_config(cfg, nested)
assert nested.exists()
def test_no_races_omits_section(tmp_path: Path) -> None:
cfg = Config(user=UserProfile(name="x", hr_max=200), db_path=tmp_path / "x.db", races=())
out = tmp_path / "openrun.toml"
write_config(cfg, out)
assert "[[races]]" not in out.read_text()
assert load_config(out).races == ()

View File

@@ -0,0 +1,135 @@
"""Path B per-second: pulling original FITs from the API instead of a website export.
`download_fit` / `backfill_fits` are mocked at the `garth.download` boundary
(network-dependent, upstream-deprecated — see ROADMAP test conventions). The FIT
extractor is pure and tested directly.
"""
from __future__ import annotations
import io
import zipfile
import pytest
from openrun.ingest import garmin_api as g
# A FIT header carries the literal b".FIT" at bytes 8..12; payload after is opaque here.
FIT_BYTES = b"\x0e\x10\x00\x00\x20\x00\x00\x00.FITdata-goes-here"
def _zip(*members: tuple[str, bytes]) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
for name, data in members:
zf.writestr(name, data)
return buf.getvalue()
# --- _extract_fit_bytes ----------------------------------------------------
def test_extract_from_zip():
assert g._extract_fit_bytes(_zip(("9.fit", FIT_BYTES))) == FIT_BYTES
def test_extract_from_zip_extensionless_member_picks_largest():
blob = _zip(("small", b"x"), ("big", FIT_BYTES))
assert g._extract_fit_bytes(blob) == FIT_BYTES
def test_extract_from_bare_fit():
assert g._extract_fit_bytes(FIT_BYTES) == FIT_BYTES
@pytest.mark.parametrize("blob", [None, b"", b"not a fit at all"])
def test_extract_rejects_non_fit(blob):
assert g._extract_fit_bytes(blob) is None
# --- download_fit ----------------------------------------------------------
def _seed_activity(conn, aid: int, atype: str = "running") -> None:
conn.execute(
"INSERT INTO activities (activity_id, activity_type, start_time_gmt, raw, fetched_at) "
"VALUES (?, ?, ?, ?, datetime('now'))",
(aid, atype, "2026-05-01T10:00:00", "{}"),
)
def test_download_fit_writes_and_links(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 42)
monkeypatch.setattr(g.garth, "download", lambda path: _zip(("42.fit", FIT_BYTES)))
ok = g.download_fit(tmp_conn, 42, fit_dir=tmp_path)
assert ok is True
dest = tmp_path / "42.fit"
assert dest.read_bytes() == FIT_BYTES
row = tmp_conn.execute(
"SELECT fit_path FROM activity_fit_files WHERE activity_id = 42"
).fetchone()
assert row is not None and row[0].endswith("42.fit")
def test_download_fit_skips_when_already_linked(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 7)
(tmp_path / "7.fit").write_bytes(FIT_BYTES)
g.record_link(tmp_conn, 7, tmp_path / "7.fit")
calls = []
monkeypatch.setattr(g.garth, "download", lambda path: calls.append(path) or b"")
assert g.download_fit(tmp_conn, 7, fit_dir=tmp_path) is True
assert calls == [] # no network call when on disk + linked
def test_download_fit_force_redownloads(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 7)
(tmp_path / "7.fit").write_bytes(b"stale")
g.record_link(tmp_conn, 7, tmp_path / "7.fit")
monkeypatch.setattr(g.garth, "download", lambda path: _zip(("7.fit", FIT_BYTES)))
assert g.download_fit(tmp_conn, 7, fit_dir=tmp_path, force=True) is True
assert (tmp_path / "7.fit").read_bytes() == FIT_BYTES
def test_download_fit_returns_false_on_empty_response(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 99)
monkeypatch.setattr(g.garth, "download", lambda path: None)
assert g.download_fit(tmp_conn, 99, fit_dir=tmp_path) is False
assert tmp_conn.execute(
"SELECT 1 FROM activity_fit_files WHERE activity_id = 99"
).fetchone() is None
# --- backfill_fits ---------------------------------------------------------
def test_backfill_only_targets_unlinked(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 1)
_seed_activity(tmp_conn, 2)
(tmp_path / "1.fit").write_bytes(FIT_BYTES)
g.record_link(tmp_conn, 1, tmp_path / "1.fit") # 1 already has a FIT
pulled = []
monkeypatch.setattr(
g.garth, "download",
lambda path: pulled.append(path) or _zip(("x.fit", FIT_BYTES)),
)
got = g.backfill_fits(tmp_conn, fit_dir=tmp_path)
assert got == 1
assert pulled == ["/download-service/files/activity/2"]
def test_backfill_respects_type_filter(tmp_conn, tmp_path, monkeypatch):
_seed_activity(tmp_conn, 10, atype="running")
_seed_activity(tmp_conn, 11, atype="cycling")
monkeypatch.setattr(g.garth, "download", lambda path: _zip(("x.fit", FIT_BYTES)))
got = g.backfill_fits(tmp_conn, fit_dir=tmp_path, activity_type="running")
assert got == 1
assert (tmp_path / "10.fit").exists()
assert not (tmp_path / "11.fit").exists()

110
tests/unit/test_manual.py Normal file
View File

@@ -0,0 +1,110 @@
"""Tests for `openrun.ingest.manual` — CSV import + idempotency."""
from __future__ import annotations
from pathlib import Path
import pytest
from openrun.ingest.manual import import_csv
def _write_csv(tmp_path: Path, text: str) -> Path:
p = tmp_path / "manual.csv"
p.write_text(text)
return p
def test_import_csv_inserts_required_columns(tmp_conn, tmp_path: Path) -> None:
csv = _write_csv(tmp_path, """\
activity_date,activity_type,distance_km,duration_min,training_load,notes
2026-05-15,strength,,45,30,upper body
2026-05-16,hike,8.0,120,40,morning hike
""")
result = import_csv(tmp_conn, csv)
assert result.inserted == 2
assert result.updated == 0
assert result.skipped == 0
assert result.errors == []
rows = tmp_conn.execute(
"SELECT activity_date, activity_type, distance_km, duration_min, training_load, notes "
"FROM manual_activities ORDER BY activity_date"
).fetchall()
assert rows[0]["activity_date"] == "2026-05-15"
assert rows[0]["activity_type"] == "strength"
assert rows[0]["distance_km"] is None
assert rows[0]["duration_min"] == 45
assert rows[0]["training_load"] == 30
assert rows[0]["notes"] == "upper body"
assert rows[1]["distance_km"] == 8.0
def test_import_csv_requires_header_columns(tmp_conn, tmp_path: Path) -> None:
csv = _write_csv(tmp_path, "date,type\n2026-05-15,strength\n") # wrong column names
with pytest.raises(ValueError, match="missing required columns"):
import_csv(tmp_conn, csv)
def test_import_csv_external_id_makes_reimport_idempotent(tmp_conn, tmp_path: Path) -> None:
csv = _write_csv(tmp_path, """\
activity_date,activity_type,training_load,external_id
2026-05-15,strength,30,strava-12345
""")
import_csv(tmp_conn, csv)
second = import_csv(tmp_conn, csv)
assert second.inserted == 0
assert second.updated == 1
count = tmp_conn.execute("SELECT COUNT(*) FROM manual_activities").fetchone()[0]
assert count == 1
def test_import_csv_reimport_with_new_tl_updates_value(tmp_conn, tmp_path: Path) -> None:
"""External_id upsert should reflect changed training_load on re-import."""
csv1 = _write_csv(tmp_path, """\
activity_date,activity_type,training_load,external_id
2026-05-15,strength,30,k1
""")
import_csv(tmp_conn, csv1)
csv2 = _write_csv(tmp_path, """\
activity_date,activity_type,training_load,external_id
2026-05-15,strength,55,k1
""")
import_csv(tmp_conn, csv2)
tl = tmp_conn.execute("SELECT training_load FROM manual_activities WHERE external_id='k1'").fetchone()[0]
assert tl == 55
def test_import_csv_blank_external_id_allows_duplicates(tmp_conn, tmp_path: Path) -> None:
"""No external_id = caller is intentionally logging without dedupe key. Two imports = two rows."""
csv = _write_csv(tmp_path, """\
activity_date,activity_type,training_load
2026-05-15,strength,30
""")
import_csv(tmp_conn, csv)
import_csv(tmp_conn, csv)
assert tmp_conn.execute("SELECT COUNT(*) FROM manual_activities").fetchone()[0] == 2
def test_import_csv_skips_bad_rows_but_keeps_going(tmp_conn, tmp_path: Path) -> None:
csv = _write_csv(tmp_path, """\
activity_date,activity_type,training_load
2026-05-15,strength,30
bad-date,hike,40
2026-05-17,run,
""")
result = import_csv(tmp_conn, csv)
assert result.inserted == 2
assert result.skipped == 1
assert len(result.errors) == 1
assert "line 3" in result.errors[0]
def test_import_csv_accepts_iso_datetime_in_date_column(tmp_conn, tmp_path: Path) -> None:
csv = _write_csv(tmp_path, """\
activity_date,activity_type,training_load
2026-05-15T08:30:00,strength,30
""")
import_csv(tmp_conn, csv)
d = tmp_conn.execute("SELECT activity_date FROM manual_activities").fetchone()[0]
assert d == "2026-05-15"

View File

@@ -4,8 +4,9 @@ from __future__ import annotations
import numpy as np
import pandas as pd
import pytest
from openrun.model import calibrate_tl_per_km, personal_records
from openrun.model import calibrate_tl_per_km, personal_records, plan_to_daily_load
def _act(aid: int, dist_km: float, pace: float, when: str = "2026-01-01") -> dict:
@@ -114,3 +115,62 @@ def test_calibrate_tl_per_km_returns_nan_when_empty(tmp_conn) -> None:
result = calibrate_tl_per_km(tmp_conn, lookback_days=None)
assert result["n"] == 0
assert np.isnan(result["median"])
# ---------------------------------------------------------------------------
# plan_to_daily_load
# ---------------------------------------------------------------------------
def _plan_row(week_start: str, weekly_km: float, long_run_km: float) -> dict:
return {"week_start": pd.Timestamp(week_start), "weekly_km": weekly_km, "long_run_km": long_run_km}
def test_plan_distributes_weekly_km_across_7_days() -> None:
"""A single 70 km / 28 km long-run week should emit 7 daily entries summing to 70·tl_per_km."""
plan = pd.DataFrame([_plan_row("2026-05-04", 70.0, 28.0)]) # Mon 2026-05-04
series = plan_to_daily_load(plan, tl_per_km=10.0)
assert len(series) == 7
assert series.sum() == pytest.approx(700.0)
# Saturday (default 40 %) gets the biggest single load.
saturday = pd.Timestamp("2026-05-09")
assert series.idxmax() == saturday
assert series[saturday] == pytest.approx(700.0 * 0.40)
def test_plan_race_week_uses_race_day_tl_per_km() -> None:
"""If a `race_dates` entry is inside the week, race day gets long_run_km × race_tl,
and the remaining km distribute across Mon/Wed/Fri at training rate."""
plan = pd.DataFrame([_plan_row("2026-05-04", 30.0, 20.0)])
series = plan_to_daily_load(
plan,
tl_per_km=11.0,
race_day_tl_per_km=7.0,
race_dates=("2026-05-09",), # Saturday
)
sat = pd.Timestamp("2026-05-09")
assert series[sat] == pytest.approx(20.0 * 7.0) # race load
# Remaining km = 30 - 20 = 10. Spread across Mon/Wed/Fri @ 11.0 → 110 / 3 each.
for d in [pd.Timestamp("2026-05-04"), pd.Timestamp("2026-05-06"), pd.Timestamp("2026-05-08")]:
assert series[d] == pytest.approx(110.0 / 3, rel=1e-6)
def test_plan_empty_returns_empty_series() -> None:
assert plan_to_daily_load(pd.DataFrame(columns=["week_start", "weekly_km", "long_run_km"]),
tl_per_km=10.0).empty
def test_plan_zero_weekly_km_yields_zeros() -> None:
"""A rest week (weekly_km=0) shouldn't crash and should produce 7 zero-load days."""
plan = pd.DataFrame([_plan_row("2026-05-04", 0.0, 0.0)])
series = plan_to_daily_load(plan, tl_per_km=10.0)
assert (series == 0).all()
assert len(series) == 7
def test_plan_custom_weekday_weights() -> None:
"""Pass a flat 1/7 split, expect equal load each day."""
plan = pd.DataFrame([_plan_row("2026-05-04", 70.0, 28.0)])
weights = {i: 1 / 7 for i in range(7)}
series = plan_to_daily_load(plan, tl_per_km=10.0, weekday_weights=weights)
assert series.std() == pytest.approx(0.0, abs=1e-9)
assert series.iloc[0] == pytest.approx(100.0)

View File

@@ -0,0 +1,65 @@
"""Tests for the `run_sync` orchestrator in `openrun.ingest.garmin_api`.
`run_sync` is the auth-free body shared by the CLI and the web "Sync now"
button. We stub every network-touching sub-sync so the test verifies only the
orchestration contract: which steps run, what the summary holds, that progress
is streamed, and that `last_sync_utc` is recorded.
"""
from __future__ import annotations
import openrun.ingest.garmin_api as g
from openrun.db import get_state
def _stub_subsyncs(monkeypatch, tmp_path):
monkeypatch.setattr(g, "sync_activities", lambda conn, **kw: 3)
monkeypatch.setattr(g, "backfill_fits", lambda conn, **kw: 2)
monkeypatch.setattr(g, "_fit_dir", lambda: tmp_path)
for name in (
"sync_steps", "sync_sleep", "sync_stress", "sync_hrv",
"sync_intensity_minutes", "sync_resting_hr", "sync_body_battery",
):
monkeypatch.setattr(g, name, lambda conn, end, period: 1)
def test_run_sync_default_pass(tmp_conn, tmp_path, monkeypatch) -> None:
_stub_subsyncs(monkeypatch, tmp_path)
msgs: list[str] = []
summary = g.run_sync(tmp_conn, progress=msgs.append)
assert summary["activities"] == 3
assert "fit_backfill" not in summary # off by default
assert summary["steps"] == 1 and summary["body battery"] == 1
assert any("activities" in m for m in msgs)
assert get_state(tmp_conn, "last_sync_utc") is not None
def test_run_sync_includes_backfill_when_requested(tmp_conn, tmp_path, monkeypatch) -> None:
_stub_subsyncs(monkeypatch, tmp_path)
summary = g.run_sync(tmp_conn, fit_backfill=True, progress=lambda _m: None)
assert summary["fit_backfill"] == 2
def test_run_sync_skip_activities(tmp_conn, tmp_path, monkeypatch) -> None:
_stub_subsyncs(monkeypatch, tmp_path)
# If activities ran, the stub would put a key in the summary — assert it didn't.
summary = g.run_sync(tmp_conn, skip_activities=True, progress=lambda _m: None)
assert "activities" not in summary
assert summary["HRV"] == 1 # wellness still runs
def test_run_sync_forwards_fit_flag(tmp_conn, tmp_path, monkeypatch) -> None:
seen: dict = {}
monkeypatch.setattr(g, "sync_activities", lambda conn, **kw: seen.update(kw) or 0)
monkeypatch.setattr(g, "_fit_dir", lambda: tmp_path)
for name in (
"sync_steps", "sync_sleep", "sync_stress", "sync_hrv",
"sync_intensity_minutes", "sync_resting_hr", "sync_body_battery",
):
monkeypatch.setattr(g, name, lambda conn, end, period: 0)
g.run_sync(tmp_conn, fetch_fit=False, full=True, progress=lambda _m: None)
assert seen["fetch_fit"] is False
assert seen["full"] is True

View File

@@ -20,6 +20,8 @@ EXPECTED_TABLES = frozenset({
"daily_body_battery",
"daily_intensity_minutes",
"daily_resting_hr",
"manual_activities",
"race_plan",
"sync_state",
})