""" arnold/sequencer.py — Sequence execution engine. Design: - One sequence can run at a time (enforced by a mutex). - Runs asynchronously in a worker thread; returns a run_id immediately. - Caller polls GET /runs/{run_id} for result. - Absolute timing: each step fires at t_start + step.t_ms (monotonic clock). - check_input: reads from IOState cache (fast-poll value), instant check. - set_output: calls IODriver.write_output() directly. - On step failure: remaining steps are skipped, outputs are NOT auto-reset (caller is responsible for safety; a future "on_abort" hook could be added). """ from __future__ import annotations import json import logging import threading import time import uuid from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Literal if TYPE_CHECKING: from .config import Config, Sequence, SequenceStep from .terminator_io import IORegistry log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Run result dataclasses # --------------------------------------------------------------------------- @dataclass class StepResult: step_index: int t_ms: int action: str signal: str success: bool detail: str = "" # human-readable description actual: bool | int | None = None # for check_input / wait_input expected: bool | int | None = None @dataclass class RunResult: run_id: str sequence_name: str status: Literal["pending", "running", "success", "failed", "error"] started_at: str = "" # ISO8601 finished_at: str = "" duration_ms: int = 0 steps_completed: int = 0 total_steps: int = 0 current_step_index: int = -1 # index of the step currently executing (-1 = none) failed_step: StepResult | None = None error_message: str = "" def to_dict(self) -> dict: d = asdict(self) return d # --------------------------------------------------------------------------- # Run log (JSON-lines file) # --------------------------------------------------------------------------- class RunLog: def __init__(self, path: Path) -> None: self._path = path self._lock = threading.Lock() def append(self, result: RunResult) -> None: with self._lock: with open(self._path, "a") as f: f.write(json.dumps(result.to_dict()) + "\n") def tail(self, n: int = 50) -> list[dict]: """Return the last n entries.""" if not self._path.exists(): return [] with self._lock: lines = self._path.read_text().splitlines() entries = [] for line in lines[-n:]: line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: pass return list(reversed(entries)) # most recent first # --------------------------------------------------------------------------- # Sequencer # --------------------------------------------------------------------------- class Sequencer: def __init__( self, config: "Config", registry: "IORegistry", log_path: Path, on_output_write: Callable[[str, bool | int], None] | None = None, ) -> None: self._config = config self._registry = registry self._run_log = RunLog(log_path) # Optional callback fired after every successful set_output step. # Signature: on_output_write(signal_name: str, value: bool | int) self._on_output_write = on_output_write # One-at-a-time enforcement self._run_lock = threading.Lock() self._active_id: str | None = None # Result store: run_id -> RunResult self._results_lock = threading.Lock() self._results: dict[str, RunResult] = {} # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self, sequence_name: str) -> tuple[str, bool]: """ Launch sequence_name in a background thread. Returns (run_id, started). started=False means another sequence is already running (caller should return HTTP 409). """ seq = self._config.sequence(sequence_name) if seq is None: raise ValueError(f"Unknown sequence: {sequence_name!r}") # Try to acquire run lock non-blocking if not self._run_lock.acquire(blocking=False): return ("", False) run_id = str(uuid.uuid4()) result = RunResult( run_id=run_id, sequence_name=sequence_name, status="pending", total_steps=len(seq.steps), ) with self._results_lock: self._results[run_id] = result self._active_id = run_id t = threading.Thread( target=self._run_thread, args=(seq, run_id), name=f"seq-{sequence_name}-{run_id[:8]}", daemon=True, ) t.start() return (run_id, True) def get_result(self, run_id: str) -> RunResult | None: with self._results_lock: return self._results.get(run_id) def active_run_id(self) -> str | None: with self._results_lock: return self._active_id def recent_runs(self, n: int = 50) -> list[dict]: return self._run_log.tail(n) # ------------------------------------------------------------------ # Execution thread # ------------------------------------------------------------------ def _run_thread(self, seq: "Sequence", run_id: str) -> None: started_at = datetime.now(timezone.utc) t_start = time.monotonic() self._update_result(run_id, status="running", started_at=started_at.isoformat()) log.info("Sequence %r started run_id=%s steps=%d", seq.name, run_id, len(seq.steps)) failed_step: StepResult | None = None steps_completed = 0 try: for i, step in enumerate(seq.steps): # Mark which step we're about to execute self._update_result(run_id, current_step_index=i) # Wait until absolute time t_ms from start target = t_start + step.t_ms / 1000.0 now = time.monotonic() if target > now: time.sleep(target - now) ok, step_result = self._execute_step(i, step) if not ok: steps_completed = i failed_step = step_result log.warning( "Sequence %r FAILED at step %d (%s %s): %s", seq.name, i, step.action, step.signal, step_result.detail, ) break steps_completed = i + 1 self._update_result(run_id, steps_completed=steps_completed) log.debug("Step %d OK: %s %s", i, step.action, step.signal) except Exception as exc: log.exception("Sequence %r raised exception: %s", seq.name, exc) finished_at = datetime.now(timezone.utc) duration_ms = int((time.monotonic() - t_start) * 1000) result = self._update_result( run_id, status="error", finished_at=finished_at.isoformat(), duration_ms=duration_ms, steps_completed=steps_completed, current_step_index=-1, error_message=str(exc), ) self._run_log.append(result) self._run_lock.release() with self._results_lock: self._active_id = None return finished_at = datetime.now(timezone.utc) duration_ms = int((time.monotonic() - t_start) * 1000) if failed_step: status = "failed" else: status = "success" steps_completed = len(seq.steps) result = self._update_result( run_id, status=status, finished_at=finished_at.isoformat(), duration_ms=duration_ms, steps_completed=steps_completed, current_step_index=-1, failed_step=failed_step, ) self._run_log.append(result) self._run_lock.release() with self._results_lock: self._active_id = None log.info( "Sequence %r %s run_id=%s duration=%dms steps=%d/%d", seq.name, status.upper(), run_id, duration_ms, steps_completed, len(seq.steps), ) # ------------------------------------------------------------------ # Step execution # ------------------------------------------------------------------ def _execute_step(self, index: int, step: "SequenceStep") -> tuple[bool, StepResult]: sig = self._config.signal(step.signal) if sig is None: # Should never happen — validated at load time sr = StepResult(index, step.t_ms, step.action, step.signal, False, f"Unknown signal {step.signal!r}") return False, sr if step.action == "set_output": return self._set_output(index, step, sig) elif step.action == "check_input": return self._check_input(index, step, sig) elif step.action == "wait_input": return self._wait_input(index, step, sig) else: sr = StepResult(index, step.t_ms, step.action, step.signal, False, f"Unknown action {step.action!r}") return False, sr # -- set_output ---------------------------------------------------- def _set_output( self, index: int, step: "SequenceStep", sig: Any ) -> tuple[bool, StepResult]: driver = self._registry.driver(sig.device) if driver is None: sr = StepResult(index, step.t_ms, step.action, step.signal, False, f"No driver for device {sig.device!r}") return False, sr if sig.value_type == "int": # Analog output — FC06 (single register write) write_val: bool | int = int(step.value or 0) ok = driver.write_register(sig.modbus_address, write_val) detail_ok = f"Set {step.signal}={write_val}" detail_err = f"Register write failed for {step.signal}" else: # Digital output — FC05 (single coil write) write_val = bool(step.state) ok = driver.write_output(sig.modbus_address, write_val) detail_ok = f"Set {step.signal}={'ON' if write_val else 'OFF'}" detail_err = f"Coil write failed for {step.signal}" if ok and self._on_output_write: try: self._on_output_write(step.signal, write_val) except Exception: pass sr = StepResult( step_index=index, t_ms=step.t_ms, action=step.action, signal=step.signal, success=ok, detail=detail_ok if ok else detail_err, ) return ok, sr # -- check_input --------------------------------------------------- def _check_input( self, index: int, step: "SequenceStep", sig: Any ) -> tuple[bool, StepResult]: actual = self._registry.get_value(step.signal) stale = self._registry.is_stale(step.signal) if stale or actual is None: sr = StepResult( step_index=index, t_ms=step.t_ms, action=step.action, signal=step.signal, success=False, detail=f"Signal {step.signal!r} is stale or not yet read", actual=actual, expected=self._expected_for_step(step, sig), ) return False, sr ok, expected_display = self._compare(actual, step, sig) if sig.value_type == "int": detail = ( f"Check {step.signal}: expected={expected_display} " f"actual={actual}" ) else: detail = ( f"Check {step.signal}: expected={'ON' if step.expected else 'OFF'} " f"actual={'ON' if actual else 'OFF'}" ) sr = StepResult( step_index=index, t_ms=step.t_ms, action=step.action, signal=step.signal, success=ok, detail=detail, actual=actual, expected=self._expected_for_step(step, sig), ) return ok, sr # -- wait_input ---------------------------------------------------- def _wait_input( self, index: int, step: "SequenceStep", sig: Any ) -> tuple[bool, StepResult]: """ Poll the signal cache until the expected value is seen or timeout expires. Polls every 50 ms. Supports both digital (exact bool match) and analog (abs(actual - expected_value) <= tolerance) comparisons. """ timeout_s = (step.timeout_ms or 0) / 1000.0 deadline = time.monotonic() + timeout_s poll_interval = 0.05 # 50 ms exp_display = self._expected_display(step, sig) while True: actual = self._registry.get_value(step.signal) stale = self._registry.is_stale(step.signal) if not stale and actual is not None: ok, _ = self._compare(actual, step, sig) if ok: sr = StepResult( step_index=index, t_ms=step.t_ms, action=step.action, signal=step.signal, success=True, detail=f"Wait {step.signal}=={exp_display}: condition met", actual=actual, expected=self._expected_for_step(step, sig), ) return True, sr if time.monotonic() >= deadline: if stale or actual is None: act_str = "stale" elif sig.value_type == "int": act_str = str(actual) else: act_str = "ON" if actual else "OFF" sr = StepResult( step_index=index, t_ms=step.t_ms, action=step.action, signal=step.signal, success=False, detail=( f"Wait {step.signal}=={exp_display}: " f"timeout after {step.timeout_ms} ms (actual={act_str})" ), actual=actual, expected=self._expected_for_step(step, sig), ) return False, sr time.sleep(poll_interval) # -- Comparison helpers -------------------------------------------- @staticmethod def _compare( actual: bool | int, step: "SequenceStep", sig: Any, ) -> tuple[bool, str]: """ Compare actual value against step expectation. Returns (match: bool, expected_display: str). """ if sig.value_type == "int": # Analog comparison with tolerance expected = step.expected_value if step.expected_value is not None else 0 tolerance = step.tolerance if step.tolerance is not None else 0 ok = abs(int(actual) - expected) <= tolerance if tolerance > 0: display = f"{expected}±{tolerance}" else: display = str(expected) return ok, display else: # Digital: exact bool match ok = (actual == step.expected) display = "ON" if step.expected else "OFF" return ok, display @staticmethod def _expected_for_step(step: "SequenceStep", sig: Any) -> bool | int | None: """Return the expected value in the appropriate type for StepResult.""" if sig.value_type == "int": return step.expected_value return step.expected @staticmethod def _expected_display(step: "SequenceStep", sig: Any) -> str: """Human-readable expected value string.""" if sig.value_type == "int": expected = step.expected_value if step.expected_value is not None else 0 tolerance = step.tolerance if step.tolerance is not None else 0 if tolerance > 0: return f"{expected}±{tolerance}" return str(expected) return "ON" if step.expected else "OFF" # ------------------------------------------------------------------ # Internal result update # ------------------------------------------------------------------ def _update_result(self, run_id: str, **kwargs) -> RunResult: with self._results_lock: result = self._results[run_id] for k, v in kwargs.items(): setattr(result, k, v) return result