Files
SequencerIO/tui.py

884 lines
36 KiB
Python
Raw Normal View History

2026-03-02 17:48:55 -05:00
#!/usr/bin/env python3
"""
tui.py Interactive TUI debugger for the Terminator I/O system.
Usage:
python3 tui.py [config.yaml]
Default config path: config.yaml (same directory as this script).
Layout:
Status bar (device health, poll rate)
Inputs (live) Outputs (selectable) Sequences (runnable)
Footer (keybindings)
Keybindings:
Tab Cycle focus between Outputs and Sequences panels
/ Navigate the focused panel
Space/Enter Outputs: toggle selected | Sequences: run selected
0 All outputs OFF (from Outputs panel)
1 All outputs ON (from Outputs panel)
r Force reconnect all devices
q Quit
"""
from __future__ import annotations
import sys
import time
import logging
import argparse
import threading
from pathlib import Path
from typing import Any, Callable
# ── Textual ──────────────────────────────────────────────────────────────────
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal
from textual.reactive import reactive
from textual.timer import Timer
from textual.widgets import Footer, Static
from textual import on
from textual.message import Message
# ── Arnold internals ─────────────────────────────────────────────────────────
sys.path.insert(0, str(Path(__file__).parent))
from arnold.config import load as load_config, Config, ConfigError, Sequence
from arnold.terminator_io import IORegistry, SignalState
from arnold.sequencer import Sequencer, RunResult
# Suppress pymodbus noise in TUI mode
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
logging.getLogger("arnold").setLevel(logging.WARNING)
# ─────────────────────────────────────────────────────────────────────────────
# Input panel
# ─────────────────────────────────────────────────────────────────────────────
class InputPanel(Static):
"""Live table of all input signals, grouped by type (digital / analog)."""
DEFAULT_CSS = """
InputPanel {
border: solid $primary;
padding: 0 1;
height: 100%;
}
"""
def __init__(
self,
input_signals: list[tuple[str, str]], # [(name, value_type), ...]
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self._input_signals = input_signals
self._digital = [n for n, vt in input_signals if vt == "bool"]
self._analog = [n for n, vt in input_signals if vt == "int"]
self._snapshot: dict[str, SignalState] = {}
def update_snapshot(self, snapshot: dict[str, SignalState]) -> None:
self._snapshot = snapshot
self.refresh()
def render(self) -> str:
lines: list[str] = ["[bold]Inputs[/bold]"]
if not self._digital and not self._analog:
lines.append(" [dim](none)[/dim]")
return "\n".join(lines)
# Digital inputs
if self._digital:
lines.append("")
if self._analog:
lines.append(" [dim underline]Digital[/dim underline]")
for name in self._digital:
state = self._snapshot.get(name)
if state is None or state.stale:
indicator = "[dim]?[/dim]"
color = "dim"
elif state.value:
indicator = "[bold green]●[/bold green]"
color = "green"
else:
indicator = "[dim]○[/dim]"
color = "white"
lines.append(f" {indicator} [{color}]{name}[/{color}]")
# Analog inputs
if self._analog:
lines.append("")
if self._digital:
lines.append(" [dim underline]Analog[/dim underline]")
for name in self._analog:
state = self._snapshot.get(name)
if state is None or state.stale:
val_str = "[dim]?[/dim]"
color = "dim"
else:
val_str = f"[bold cyan]{state.value:>5}[/bold cyan]"
color = "cyan"
lines.append(f" [{color}]{name:<20}[/{color}] {val_str}")
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# Output panel
# ─────────────────────────────────────────────────────────────────────────────
class OutputPanel(Static, can_focus=True):
"""
Selectable list of output signals.
Displays shadow state (what we last wrote) the EBC100 has no readback.
Digital outputs: Space/Enter to toggle ON/OFF, 0/1 for all off/on.
Analog outputs: +/- to adjust value by step (100 default), Enter to write.
"""
DEFAULT_CSS = """
OutputPanel {
border: solid $accent;
padding: 0 1;
height: 100%;
}
OutputPanel:focus {
border: solid $accent-lighten-2;
}
"""
BINDINGS = [
Binding("up", "cursor_up", "Up", show=False),
Binding("down", "cursor_down", "Down", show=False),
Binding("space", "do_toggle", "Toggle", show=False),
Binding("enter", "do_write", "Write", show=False),
Binding("0", "all_off", "All OFF", show=False),
Binding("1", "all_on", "All ON", show=False),
Binding("plus_sign", "analog_up", "+", show=False),
Binding("hyphen_minus","analog_down", "-", show=False),
]
cursor: reactive[int] = reactive(0)
# Step size for analog +/- adjustment
ANALOG_STEP = 100
# ── Messages ─────────────────────────────────────────────────────────────
class ToggleOutput(Message):
"""Digital output toggle request."""
def __init__(self, signal: str, current_value: bool) -> None:
super().__init__()
self.signal = signal
self.current_value = current_value
class WriteAnalog(Message):
"""Analog output write request (user pressed Enter on an analog output)."""
def __init__(self, signal: str, value: int) -> None:
super().__init__()
self.signal = signal
self.value = value
class AllOutputs(Message):
"""Set all digital outputs to a given state."""
def __init__(self, value: bool) -> None:
super().__init__()
self.value = value
# ── Init / update ─────────────────────────────────────────────────────────
def __init__(
self,
output_signals: list[tuple[str, str]], # [(name, value_type), ...]
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self._signals: list[tuple[str, str]] = output_signals
self._names: list[str] = [n for n, _ in output_signals]
self._type_map: dict[str, str] = {n: vt for n, vt in output_signals}
self._state: dict[str, bool | int] = {}
for name, vt in output_signals:
self._state[name] = 0 if vt == "int" else False
# Pending analog value edits (before Enter commits)
self._analog_pending: dict[str, int] = {}
def update_output_state(self, state: dict[str, bool | int]) -> None:
self._state = state
self.refresh()
# ── Render ────────────────────────────────────────────────────────────────
def render(self) -> str:
digital = [(n, vt) for n, vt in self._signals if vt == "bool"]
analog = [(n, vt) for n, vt in self._signals if vt == "int"]
lines: list[str] = ["[bold]Outputs[/bold]"]
if not self._names:
lines.append(" [dim](no outputs configured)[/dim]")
return "\n".join(lines)
# Digital outputs
if digital:
lines.append("")
if analog:
lines.append(" [dim underline]Digital[/dim underline] [dim](Space toggle · 0/1 all)[/dim]")
else:
lines.append(" [dim](↑↓ navigate · Space toggle · 0/1 all)[/dim]")
for name, _ in digital:
i = self._names.index(name)
val = self._state.get(name, False)
indicator = "[bold green]●[/bold green]" if val else "[dim]○[/dim]"
val_str = "[bold green]ON [/bold green]" if val else "OFF"
if i == self.cursor:
line = f"[reverse] ► [/reverse] {indicator} [reverse]{name}[/reverse] {val_str}"
else:
line = f" {indicator} {name} {val_str}"
lines.append(line)
# Analog outputs
if analog:
lines.append("")
if digital:
lines.append(" [dim underline]Analog[/dim underline] [dim](+/- adjust · Enter write)[/dim]")
else:
lines.append(" [dim](↑↓ navigate · +/- adjust · Enter write)[/dim]")
for name, _ in analog:
i = self._names.index(name)
committed = self._state.get(name, 0)
pending = self._analog_pending.get(name)
if pending is not None and pending != committed:
val_str = f"[bold yellow]{pending:>5}[/bold yellow] [dim](pending)[/dim]"
else:
val_str = f"[bold cyan]{committed:>5}[/bold cyan]"
if i == self.cursor:
line = f"[reverse] ► [/reverse] [reverse]{name}[/reverse] {val_str}"
else:
line = f" {name} {val_str}"
lines.append(line)
return "\n".join(lines)
# ── Actions ───────────────────────────────────────────────────────────────
def _current_type(self) -> str | None:
if not self._names:
return None
return self._type_map.get(self._names[self.cursor])
def action_cursor_up(self) -> None:
if self._names:
self.cursor = (self.cursor - 1) % len(self._names)
def action_cursor_down(self) -> None:
if self._names:
self.cursor = (self.cursor + 1) % len(self._names)
def action_do_toggle(self) -> None:
"""Space: toggle digital output, or increment analog by step."""
if not self._names:
return
name = self._names[self.cursor]
if self._type_map.get(name) == "int":
self._adjust_analog(name, self.ANALOG_STEP)
else:
current = bool(self._state.get(name, False))
self.post_message(self.ToggleOutput(signal=name, current_value=current))
def action_do_write(self) -> None:
"""Enter: toggle digital output, or commit pending analog value."""
if not self._names:
return
name = self._names[self.cursor]
if self._type_map.get(name) == "int":
pending = self._analog_pending.get(name)
if pending is not None:
self.post_message(self.WriteAnalog(signal=name, value=pending))
# Clear pending — will be updated from state after write
self._analog_pending.pop(name, None)
# If no pending change, Enter is a no-op for analog
else:
current = bool(self._state.get(name, False))
self.post_message(self.ToggleOutput(signal=name, current_value=current))
def action_analog_up(self) -> None:
if not self._names:
return
name = self._names[self.cursor]
if self._type_map.get(name) == "int":
self._adjust_analog(name, self.ANALOG_STEP)
def action_analog_down(self) -> None:
if not self._names:
return
name = self._names[self.cursor]
if self._type_map.get(name) == "int":
self._adjust_analog(name, -self.ANALOG_STEP)
def _adjust_analog(self, name: str, delta: int) -> None:
current = self._analog_pending.get(name)
if current is None:
current = int(self._state.get(name, 0))
new_val = max(0, min(65535, current + delta))
self._analog_pending[name] = new_val
self.refresh()
def action_all_off(self) -> None:
self.post_message(self.AllOutputs(value=False))
def action_all_on(self) -> None:
self.post_message(self.AllOutputs(value=True))
# ─────────────────────────────────────────────────────────────────────────────
# Sequence panel
# ─────────────────────────────────────────────────────────────────────────────
class SequencePanel(Static, can_focus=True):
"""
Idle mode: navigable list of all sequences; Enter/Space to run.
Running mode: full step list for the active sequence with the current
step highlighted and a progress bar header.
"""
DEFAULT_CSS = """
SequencePanel {
border: solid $warning-darken-2;
padding: 0 1;
height: 100%;
}
SequencePanel:focus {
border: solid $warning;
}
"""
BINDINGS = [
Binding("up", "cursor_up", "Up", show=False),
Binding("down", "cursor_down","Down", show=False),
Binding("space", "do_run", "Run", show=False),
Binding("enter", "do_run", "Run", show=False),
]
cursor: reactive[int] = reactive(0)
# ── Messages ─────────────────────────────────────────────────────────────
class RunSequence(Message):
def __init__(self, name: str) -> None:
super().__init__()
self.name = name
# ── Init / update ─────────────────────────────────────────────────────────
def __init__(self, sequences: list[Sequence], **kwargs: Any) -> None:
super().__init__(**kwargs)
self._sequences: list[Sequence] = sequences
self._seq_by_name: dict[str, Sequence] = {s.name: s for s in sequences}
self._active_run: RunResult | None = None
self._last_result: RunResult | None = None
def update_run_state(
self,
active_run: RunResult | None,
last_result: RunResult | None,
) -> None:
self._active_run = active_run
self._last_result = last_result
self.refresh()
# ── Render: dispatch ──────────────────────────────────────────────────────
def render(self) -> str:
if self._active_run and self._active_run.status in ("pending", "running"):
return self._render_running()
return self._render_idle()
# ── Render: idle (sequence list) ──────────────────────────────────────────
def _render_idle(self) -> str:
lines: list[str] = [
"[bold]Sequences[/bold] [dim](↑↓ navigate · Enter run)[/dim]\n"
]
if not self._sequences:
lines.append(" [dim](no sequences configured)[/dim]")
return "\n".join(lines)
for i, seq in enumerate(self._sequences):
total = len(seq.steps)
if i == self.cursor:
line = (
f"[reverse] ► [/reverse] [reverse]{seq.name}[/reverse]"
f" [dim]{total} steps[/dim]"
)
else:
line = f" {seq.name} [dim]{total} steps[/dim]"
lines.append(line)
if seq.description:
short = seq.description.strip().split("\n")[0][:60]
lines.append(f" [dim]{short}[/dim]")
# Last result summary (shown below the list when idle)
if self._last_result:
r = self._last_result
color = {"success": "green", "failed": "red", "error": "red"}.get(
r.status, "dim"
)
lines.append(
f"\n[dim]Last run:[/dim] [{color}]{r.sequence_name} "
f"{r.status.upper()}[/{color}]"
f" [dim]{r.steps_completed}/{r.total_steps} steps"
f" {r.duration_ms} ms[/dim]"
)
if r.failed_step:
fs = r.failed_step
lines.append(
f" [red]✗ step {fs.step_index} ({fs.t_ms} ms):"
f" {fs.detail}[/red]"
)
return "\n".join(lines)
# ── Render: running (step list) ───────────────────────────────────────────
def _render_running(self) -> str:
run = self._active_run
assert run is not None
seq = self._seq_by_name.get(run.sequence_name)
if seq is None:
return f"[yellow]Running: {run.sequence_name}[/yellow]\n[dim](steps unknown)[/dim]"
total = len(seq.steps)
done = run.steps_completed
current = run.current_step_index
pct = int(done / total * 100) if total else 0
bar = _progress_bar(pct, width=16)
lines: list[str] = [
f"[bold yellow]▶ {seq.name}[/bold yellow]"
f" {bar} [yellow]{done}/{total}[/yellow]\n"
]
for i, step in enumerate(seq.steps):
t_s = step.t_ms / 1000
t_str = f"{t_s:6.1f}s"
if step.action == "set_output":
if step.value is not None:
# Analog output
action = f"set {step.signal}{step.value}"
else:
val = "ON " if step.state else "OFF"
action = f"set {step.signal}{val}"
elif step.action == "check_input":
if step.expected_value is not None:
tol = f"±{step.tolerance}" if step.tolerance else ""
action = f"chk {step.signal} == {step.expected_value}{tol}"
else:
exp = "ON " if step.expected else "OFF"
action = f"chk {step.signal} == {exp}"
elif step.action == "wait_input":
t_out = f"{step.timeout_ms} ms" if step.timeout_ms else "?"
if step.expected_value is not None:
tol = f"±{step.tolerance}" if step.tolerance else ""
action = f"wait {step.signal} == {step.expected_value}{tol} (timeout {t_out})"
else:
exp = "ON " if step.expected else "OFF"
action = f"wait {step.signal} == {exp} (timeout {t_out})"
else:
action = f"{step.action} {step.signal}"
if i == current:
# Currently executing — bright highlight
line = f"[reverse][bold yellow] ► {t_str} {action} [/bold yellow][/reverse]"
elif i < done:
# Already completed
line = f" [dim green]✓ {t_str} {action}[/dim green]"
else:
# Pending
line = f" [dim] {t_str} {action}[/dim]"
lines.append(line)
return "\n".join(lines)
# ── Actions ───────────────────────────────────────────────────────────────
def action_cursor_up(self) -> None:
if self._sequences:
self.cursor = (self.cursor - 1) % len(self._sequences)
def action_cursor_down(self) -> None:
if self._sequences:
self.cursor = (self.cursor + 1) % len(self._sequences)
def action_do_run(self) -> None:
if not self._sequences:
return
self.post_message(self.RunSequence(name=self._sequences[self.cursor].name))
def _progress_bar(pct: int, width: int = 16) -> str:
filled = int(width * pct / 100)
return "[" + "" * filled + "" * (width - filled) + "]"
# ─────────────────────────────────────────────────────────────────────────────
# Status bar
# ─────────────────────────────────────────────────────────────────────────────
class StatusBar(Static):
DEFAULT_CSS = """
StatusBar {
dock: top;
height: 1;
padding: 0 1;
}
"""
def __init__(self, **kwargs: Any) -> None:
super().__init__("", **kwargs)
self._stats: list[dict] = []
self._msg: str = ""
def update_stats(self, stats: list[dict]) -> None:
self._stats = stats
self._rebuild()
def set_message(self, msg: str) -> None:
self._msg = msg
self._rebuild()
def _rebuild(self) -> None:
parts: list[str] = []
for s in self._stats:
dot = "[green]●[/green]" if s.get("connected") else "[red]✗[/red]"
parts.append(
f"{dot} {s['device_id']} "
f"{s.get('achieved_hz', 0):.0f} Hz "
f"err={s.get('error_count', 0)}"
)
status = " ".join(parts) if parts else "[dim]no devices[/dim]"
msg = f" [yellow]{self._msg}[/yellow]" if self._msg else ""
self.update(status + msg)
# ─────────────────────────────────────────────────────────────────────────────
# Main application
# ─────────────────────────────────────────────────────────────────────────────
class TerminatorTUI(App):
"""Arnold Terminator I/O debug TUI."""
TITLE = "Arnold — Terminator I/O Debugger"
CSS = """
Screen {
layout: vertical;
}
#main-area {
height: 1fr;
layout: horizontal;
}
#input-panel {
width: 1fr;
height: 100%;
overflow-y: auto;
}
#output-panel {
width: 1fr;
height: 100%;
overflow-y: auto;
}
#sequence-panel {
width: 2fr;
height: 100%;
overflow-y: auto;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("r", "reconnect", "Reconnect"),
Binding("tab", "cycle_focus","Tab", show=False),
]
def __init__(self, config: Config, registry: IORegistry,
sequencer: Sequencer) -> None:
super().__init__()
self._cfg = config
self._io = registry
self._seq = sequencer
# Build typed signal lists: [(name, value_type), ...]
self._input_signals: list[tuple[str, str]] = [
(s.name, s.value_type)
for s in config.logical_io if s.direction == "input"
]
self._output_signals: list[tuple[str, str]] = [
(s.name, s.value_type)
for s in config.logical_io if s.direction == "output"
]
self._input_names = [n for n, _ in self._input_signals]
self._output_names = [n for n, _ in self._output_signals]
self._output_type_map: dict[str, str] = {n: vt for n, vt in self._output_signals}
# Shadow output state — updated on write (EBC100 has no output readback)
self._output_state: dict[str, bool | int] = {}
for name, vt in self._output_signals:
self._output_state[name] = 0 if vt == "int" else False
# Last completed run (for the sequence panel summary)
self._last_run_id: str | None = None
self._last_result: RunResult | None = None
self._refresh_timer: Timer | None = None
self._status_clear_timer: Timer | None = None
# ── Layout ───────────────────────────────────────────────────────────────
def compose(self) -> ComposeResult:
yield StatusBar(id="status-bar")
with Horizontal(id="main-area"):
yield InputPanel(input_signals=self._input_signals, id="input-panel")
yield OutputPanel(output_signals=self._output_signals, id="output-panel")
yield SequencePanel(
sequences=self._cfg.sequences,
id="sequence-panel",
)
yield Footer()
def on_mount(self) -> None:
self.query_one(OutputPanel).focus()
self._io.start()
self._refresh_timer = self.set_interval(1 / 20, self._refresh_ui)
# Apply output defaults in background (needs connection to settle first)
threading.Thread(target=self._apply_defaults, daemon=True).start()
def _apply_defaults(self) -> None:
import time as _time
_time.sleep(0.5) # give Modbus connection a moment to establish
for sig in self._cfg.logical_io:
if sig.direction != "output":
continue
driver = self._io.driver(sig.device)
if driver is None:
continue
# Digital defaults
if sig.default_state is not None and sig.value_type == "bool":
ok = driver.write_output(sig.modbus_address, sig.default_state)
if ok:
self._output_state[sig.name] = sig.default_state
# Analog defaults
if sig.default_value is not None and sig.value_type == "int":
ok = driver.write_register(sig.modbus_address, sig.default_value)
if ok:
self._output_state[sig.name] = sig.default_value
# ── Periodic refresh ─────────────────────────────────────────────────────
def _refresh_ui(self) -> None:
snapshot = self._io.snapshot()
# Inputs
input_snap = {n: s for n, s in snapshot.items() if n in self._input_names}
self.query_one(InputPanel).update_snapshot(input_snap)
# Outputs (shadow state)
self.query_one(OutputPanel).update_output_state(self._output_state)
# Sequences — get live run result
active_id = self._seq.active_run_id()
active_run: RunResult | None = None
if active_id:
active_run = self._seq.get_result(active_id)
# Detect a newly-completed run and remember it
if not active_id and self._last_run_id:
result = self._seq.get_result(self._last_run_id)
if result and result.status not in ("pending", "running"):
self._last_result = result
self._last_run_id = None
self.query_one(SequencePanel).update_run_state(
active_run = active_run,
last_result = self._last_result,
)
# Status bar
driver_map = {d["device_id"]: d for d in self._io.driver_status()}
combined: list[dict] = []
for ps in self._io.poll_stats():
did = ps["device_id"]
combined.append({**ps, "connected": driver_map.get(did, {}).get("connected", False)})
self.query_one(StatusBar).update_stats(combined)
# ── Output events ─────────────────────────────────────────────────────────
@on(OutputPanel.ToggleOutput)
def handle_toggle(self, event: OutputPanel.ToggleOutput) -> None:
self._write_digital_output(event.signal, not event.current_value)
@on(OutputPanel.WriteAnalog)
def handle_write_analog(self, event: OutputPanel.WriteAnalog) -> None:
self._write_analog_output(event.signal, event.value)
@on(OutputPanel.AllOutputs)
def handle_all_outputs(self, event: OutputPanel.AllOutputs) -> None:
for name in self._output_names:
if self._output_type_map.get(name) == "bool":
self._write_digital_output(name, event.value)
def _write_digital_output(self, signal_name: str, value: bool) -> None:
sig = self._cfg.signal(signal_name)
if sig is None:
self._flash(f"Unknown signal: {signal_name}")
return
driver = self._io.driver(sig.device)
if driver is None:
self._flash(f"No driver for {sig.device}")
return
def do_write() -> None:
ok = driver.write_output(sig.modbus_address, value)
val_str = "ON" if value else "OFF"
if ok:
self._output_state[signal_name] = value
self._flash(f"{signal_name}{val_str}")
else:
self._flash(f"WRITE FAILED: {signal_name}{val_str}")
threading.Thread(target=do_write, daemon=True).start()
def _write_analog_output(self, signal_name: str, value: int) -> None:
sig = self._cfg.signal(signal_name)
if sig is None:
self._flash(f"Unknown signal: {signal_name}")
return
driver = self._io.driver(sig.device)
if driver is None:
self._flash(f"No driver for {sig.device}")
return
def do_write() -> None:
ok = driver.write_register(sig.modbus_address, value)
if ok:
self._output_state[signal_name] = value
self._flash(f"{signal_name}{value}")
else:
self._flash(f"WRITE FAILED: {signal_name}{value}")
threading.Thread(target=do_write, daemon=True).start()
# ── Sequence events ───────────────────────────────────────────────────────
@on(SequencePanel.RunSequence)
def handle_run_sequence(self, event: SequencePanel.RunSequence) -> None:
active_id = self._seq.active_run_id()
if active_id:
self._flash(f"Busy: sequence already running")
return
try:
run_id, started = self._seq.start(event.name)
except ValueError as e:
self._flash(str(e))
return
if not started:
self._flash("Busy: sequence already running")
return
self._last_run_id = run_id
self._flash(f"Started: {event.name}")
# ── Status flash ──────────────────────────────────────────────────────────
def _flash(self, msg: str, duration: float = 4.0) -> None:
self.query_one(StatusBar).set_message(msg)
if self._status_clear_timer:
self._status_clear_timer.stop()
self._status_clear_timer = self.set_timer(
duration, lambda: self.query_one(StatusBar).set_message("")
)
# ── Actions ───────────────────────────────────────────────────────────────
def action_cycle_focus(self) -> None:
panels = [self.query_one(OutputPanel), self.query_one(SequencePanel)]
focused = self.focused
try:
idx = panels.index(focused) # type: ignore[arg-type]
panels[(idx + 1) % len(panels)].focus()
except ValueError:
panels[0].focus()
def action_reconnect(self) -> None:
self._flash("Reconnecting…")
def do_reconnect() -> None:
for dev in self._cfg.devices:
driver = self._io.driver(dev.id)
if driver:
driver.connect()
self._flash("Reconnect done")
threading.Thread(target=do_reconnect, daemon=True).start()
async def on_unmount(self) -> None:
self._io.stop()
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="Arnold Terminator I/O debug TUI")
parser.add_argument(
"config",
nargs="?",
default=str(Path(__file__).parent / "config.yaml"),
help="Path to YAML config file (default: config.yaml)",
)
args = parser.parse_args()
try:
config = load_config(args.config)
except ConfigError as e:
print(f"Config error: {e}", file=sys.stderr)
sys.exit(1)
registry = IORegistry(config)
# The sequencer's on_output_write callback keeps _output_state in sync
# when a running sequence drives outputs. We store a reference to the
# dict and mutate it in-place from the callback (called from the
# sequencer's worker thread — dict writes are GIL-safe for simple
# key assignment).
output_state: dict[str, bool | int] = {}
for s in config.logical_io:
if s.direction == "output":
output_state[s.name] = 0 if s.value_type == "int" else False
def on_output_write(signal_name: str, value: bool | int) -> None:
output_state[signal_name] = value
sequencer = Sequencer(
config = config,
registry = registry,
log_path = Path(__file__).parent / "runs.log",
on_output_write = on_output_write,
)
app = TerminatorTUI(config=config, registry=registry, sequencer=sequencer)
# Share the same dict object so the callback and the TUI mutate the same state
app._output_state = output_state
app.run()
if __name__ == "__main__":
main()