884 lines
36 KiB
Python
884 lines
36 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
tui.py — Interactive TUI debugger for the Terminator I/O system.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python3 tui.py [config.yaml]
|
||
|
|
|
||
|
|
Default config path: config.yaml (same directory as this script).
|
||
|
|
|
||
|
|
Layout:
|
||
|
|
┌─ Status bar (device health, poll rate) ────────────────────────────────┐
|
||
|
|
│ Inputs (live) │ Outputs (selectable) │ Sequences (runnable) │
|
||
|
|
└─ Footer (keybindings) ─────────────────────────────────────────────────┘
|
||
|
|
|
||
|
|
Keybindings:
|
||
|
|
Tab Cycle focus between Outputs and Sequences panels
|
||
|
|
↑ / ↓ Navigate the focused panel
|
||
|
|
Space/Enter Outputs: toggle selected | Sequences: run selected
|
||
|
|
0 All outputs OFF (from Outputs panel)
|
||
|
|
1 All outputs ON (from Outputs panel)
|
||
|
|
r Force reconnect all devices
|
||
|
|
q Quit
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import logging
|
||
|
|
import argparse
|
||
|
|
import threading
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any, Callable
|
||
|
|
|
||
|
|
# ── Textual ──────────────────────────────────────────────────────────────────
|
||
|
|
from textual.app import App, ComposeResult
|
||
|
|
from textual.binding import Binding
|
||
|
|
from textual.containers import Horizontal
|
||
|
|
from textual.reactive import reactive
|
||
|
|
from textual.timer import Timer
|
||
|
|
from textual.widgets import Footer, Static
|
||
|
|
from textual import on
|
||
|
|
from textual.message import Message
|
||
|
|
|
||
|
|
# ── Arnold internals ─────────────────────────────────────────────────────────
|
||
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
||
|
|
from arnold.config import load as load_config, Config, ConfigError, Sequence
|
||
|
|
from arnold.terminator_io import IORegistry, SignalState
|
||
|
|
from arnold.sequencer import Sequencer, RunResult
|
||
|
|
|
||
|
|
# Suppress pymodbus noise in TUI mode
|
||
|
|
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
|
||
|
|
logging.getLogger("arnold").setLevel(logging.WARNING)
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Input panel
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class InputPanel(Static):
|
||
|
|
"""Live table of all input signals, grouped by type (digital / analog)."""
|
||
|
|
|
||
|
|
DEFAULT_CSS = """
|
||
|
|
InputPanel {
|
||
|
|
border: solid $primary;
|
||
|
|
padding: 0 1;
|
||
|
|
height: 100%;
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
input_signals: list[tuple[str, str]], # [(name, value_type), ...]
|
||
|
|
**kwargs: Any,
|
||
|
|
) -> None:
|
||
|
|
super().__init__(**kwargs)
|
||
|
|
self._input_signals = input_signals
|
||
|
|
self._digital = [n for n, vt in input_signals if vt == "bool"]
|
||
|
|
self._analog = [n for n, vt in input_signals if vt == "int"]
|
||
|
|
self._snapshot: dict[str, SignalState] = {}
|
||
|
|
|
||
|
|
def update_snapshot(self, snapshot: dict[str, SignalState]) -> None:
|
||
|
|
self._snapshot = snapshot
|
||
|
|
self.refresh()
|
||
|
|
|
||
|
|
def render(self) -> str:
|
||
|
|
lines: list[str] = ["[bold]Inputs[/bold]"]
|
||
|
|
|
||
|
|
if not self._digital and not self._analog:
|
||
|
|
lines.append(" [dim](none)[/dim]")
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
# Digital inputs
|
||
|
|
if self._digital:
|
||
|
|
lines.append("")
|
||
|
|
if self._analog:
|
||
|
|
lines.append(" [dim underline]Digital[/dim underline]")
|
||
|
|
for name in self._digital:
|
||
|
|
state = self._snapshot.get(name)
|
||
|
|
if state is None or state.stale:
|
||
|
|
indicator = "[dim]?[/dim]"
|
||
|
|
color = "dim"
|
||
|
|
elif state.value:
|
||
|
|
indicator = "[bold green]●[/bold green]"
|
||
|
|
color = "green"
|
||
|
|
else:
|
||
|
|
indicator = "[dim]○[/dim]"
|
||
|
|
color = "white"
|
||
|
|
lines.append(f" {indicator} [{color}]{name}[/{color}]")
|
||
|
|
|
||
|
|
# Analog inputs
|
||
|
|
if self._analog:
|
||
|
|
lines.append("")
|
||
|
|
if self._digital:
|
||
|
|
lines.append(" [dim underline]Analog[/dim underline]")
|
||
|
|
for name in self._analog:
|
||
|
|
state = self._snapshot.get(name)
|
||
|
|
if state is None or state.stale:
|
||
|
|
val_str = "[dim]?[/dim]"
|
||
|
|
color = "dim"
|
||
|
|
else:
|
||
|
|
val_str = f"[bold cyan]{state.value:>5}[/bold cyan]"
|
||
|
|
color = "cyan"
|
||
|
|
lines.append(f" [{color}]{name:<20}[/{color}] {val_str}")
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Output panel
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class OutputPanel(Static, can_focus=True):
|
||
|
|
"""
|
||
|
|
Selectable list of output signals.
|
||
|
|
Displays shadow state (what we last wrote) — the EBC100 has no readback.
|
||
|
|
|
||
|
|
Digital outputs: Space/Enter to toggle ON/OFF, 0/1 for all off/on.
|
||
|
|
Analog outputs: +/- to adjust value by step (100 default), Enter to write.
|
||
|
|
"""
|
||
|
|
|
||
|
|
DEFAULT_CSS = """
|
||
|
|
OutputPanel {
|
||
|
|
border: solid $accent;
|
||
|
|
padding: 0 1;
|
||
|
|
height: 100%;
|
||
|
|
}
|
||
|
|
OutputPanel:focus {
|
||
|
|
border: solid $accent-lighten-2;
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
BINDINGS = [
|
||
|
|
Binding("up", "cursor_up", "Up", show=False),
|
||
|
|
Binding("down", "cursor_down", "Down", show=False),
|
||
|
|
Binding("space", "do_toggle", "Toggle", show=False),
|
||
|
|
Binding("enter", "do_write", "Write", show=False),
|
||
|
|
Binding("0", "all_off", "All OFF", show=False),
|
||
|
|
Binding("1", "all_on", "All ON", show=False),
|
||
|
|
Binding("plus_sign", "analog_up", "+", show=False),
|
||
|
|
Binding("hyphen_minus","analog_down", "-", show=False),
|
||
|
|
]
|
||
|
|
|
||
|
|
cursor: reactive[int] = reactive(0)
|
||
|
|
|
||
|
|
# Step size for analog +/- adjustment
|
||
|
|
ANALOG_STEP = 100
|
||
|
|
|
||
|
|
# ── Messages ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class ToggleOutput(Message):
|
||
|
|
"""Digital output toggle request."""
|
||
|
|
def __init__(self, signal: str, current_value: bool) -> None:
|
||
|
|
super().__init__()
|
||
|
|
self.signal = signal
|
||
|
|
self.current_value = current_value
|
||
|
|
|
||
|
|
class WriteAnalog(Message):
|
||
|
|
"""Analog output write request (user pressed Enter on an analog output)."""
|
||
|
|
def __init__(self, signal: str, value: int) -> None:
|
||
|
|
super().__init__()
|
||
|
|
self.signal = signal
|
||
|
|
self.value = value
|
||
|
|
|
||
|
|
class AllOutputs(Message):
|
||
|
|
"""Set all digital outputs to a given state."""
|
||
|
|
def __init__(self, value: bool) -> None:
|
||
|
|
super().__init__()
|
||
|
|
self.value = value
|
||
|
|
|
||
|
|
# ── Init / update ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
output_signals: list[tuple[str, str]], # [(name, value_type), ...]
|
||
|
|
**kwargs: Any,
|
||
|
|
) -> None:
|
||
|
|
super().__init__(**kwargs)
|
||
|
|
self._signals: list[tuple[str, str]] = output_signals
|
||
|
|
self._names: list[str] = [n for n, _ in output_signals]
|
||
|
|
self._type_map: dict[str, str] = {n: vt for n, vt in output_signals}
|
||
|
|
self._state: dict[str, bool | int] = {}
|
||
|
|
for name, vt in output_signals:
|
||
|
|
self._state[name] = 0 if vt == "int" else False
|
||
|
|
|
||
|
|
# Pending analog value edits (before Enter commits)
|
||
|
|
self._analog_pending: dict[str, int] = {}
|
||
|
|
|
||
|
|
def update_output_state(self, state: dict[str, bool | int]) -> None:
|
||
|
|
self._state = state
|
||
|
|
self.refresh()
|
||
|
|
|
||
|
|
# ── Render ────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def render(self) -> str:
|
||
|
|
digital = [(n, vt) for n, vt in self._signals if vt == "bool"]
|
||
|
|
analog = [(n, vt) for n, vt in self._signals if vt == "int"]
|
||
|
|
|
||
|
|
lines: list[str] = ["[bold]Outputs[/bold]"]
|
||
|
|
if not self._names:
|
||
|
|
lines.append(" [dim](no outputs configured)[/dim]")
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
# Digital outputs
|
||
|
|
if digital:
|
||
|
|
lines.append("")
|
||
|
|
if analog:
|
||
|
|
lines.append(" [dim underline]Digital[/dim underline] [dim](Space toggle · 0/1 all)[/dim]")
|
||
|
|
else:
|
||
|
|
lines.append(" [dim](↑↓ navigate · Space toggle · 0/1 all)[/dim]")
|
||
|
|
for name, _ in digital:
|
||
|
|
i = self._names.index(name)
|
||
|
|
val = self._state.get(name, False)
|
||
|
|
indicator = "[bold green]●[/bold green]" if val else "[dim]○[/dim]"
|
||
|
|
val_str = "[bold green]ON [/bold green]" if val else "OFF"
|
||
|
|
if i == self.cursor:
|
||
|
|
line = f"[reverse] ► [/reverse] {indicator} [reverse]{name}[/reverse] {val_str}"
|
||
|
|
else:
|
||
|
|
line = f" {indicator} {name} {val_str}"
|
||
|
|
lines.append(line)
|
||
|
|
|
||
|
|
# Analog outputs
|
||
|
|
if analog:
|
||
|
|
lines.append("")
|
||
|
|
if digital:
|
||
|
|
lines.append(" [dim underline]Analog[/dim underline] [dim](+/- adjust · Enter write)[/dim]")
|
||
|
|
else:
|
||
|
|
lines.append(" [dim](↑↓ navigate · +/- adjust · Enter write)[/dim]")
|
||
|
|
for name, _ in analog:
|
||
|
|
i = self._names.index(name)
|
||
|
|
committed = self._state.get(name, 0)
|
||
|
|
pending = self._analog_pending.get(name)
|
||
|
|
if pending is not None and pending != committed:
|
||
|
|
val_str = f"[bold yellow]{pending:>5}[/bold yellow] [dim](pending)[/dim]"
|
||
|
|
else:
|
||
|
|
val_str = f"[bold cyan]{committed:>5}[/bold cyan]"
|
||
|
|
if i == self.cursor:
|
||
|
|
line = f"[reverse] ► [/reverse] [reverse]{name}[/reverse] {val_str}"
|
||
|
|
else:
|
||
|
|
line = f" {name} {val_str}"
|
||
|
|
lines.append(line)
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
# ── Actions ───────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _current_type(self) -> str | None:
|
||
|
|
if not self._names:
|
||
|
|
return None
|
||
|
|
return self._type_map.get(self._names[self.cursor])
|
||
|
|
|
||
|
|
def action_cursor_up(self) -> None:
|
||
|
|
if self._names:
|
||
|
|
self.cursor = (self.cursor - 1) % len(self._names)
|
||
|
|
|
||
|
|
def action_cursor_down(self) -> None:
|
||
|
|
if self._names:
|
||
|
|
self.cursor = (self.cursor + 1) % len(self._names)
|
||
|
|
|
||
|
|
def action_do_toggle(self) -> None:
|
||
|
|
"""Space: toggle digital output, or increment analog by step."""
|
||
|
|
if not self._names:
|
||
|
|
return
|
||
|
|
name = self._names[self.cursor]
|
||
|
|
if self._type_map.get(name) == "int":
|
||
|
|
self._adjust_analog(name, self.ANALOG_STEP)
|
||
|
|
else:
|
||
|
|
current = bool(self._state.get(name, False))
|
||
|
|
self.post_message(self.ToggleOutput(signal=name, current_value=current))
|
||
|
|
|
||
|
|
def action_do_write(self) -> None:
|
||
|
|
"""Enter: toggle digital output, or commit pending analog value."""
|
||
|
|
if not self._names:
|
||
|
|
return
|
||
|
|
name = self._names[self.cursor]
|
||
|
|
if self._type_map.get(name) == "int":
|
||
|
|
pending = self._analog_pending.get(name)
|
||
|
|
if pending is not None:
|
||
|
|
self.post_message(self.WriteAnalog(signal=name, value=pending))
|
||
|
|
# Clear pending — will be updated from state after write
|
||
|
|
self._analog_pending.pop(name, None)
|
||
|
|
# If no pending change, Enter is a no-op for analog
|
||
|
|
else:
|
||
|
|
current = bool(self._state.get(name, False))
|
||
|
|
self.post_message(self.ToggleOutput(signal=name, current_value=current))
|
||
|
|
|
||
|
|
def action_analog_up(self) -> None:
|
||
|
|
if not self._names:
|
||
|
|
return
|
||
|
|
name = self._names[self.cursor]
|
||
|
|
if self._type_map.get(name) == "int":
|
||
|
|
self._adjust_analog(name, self.ANALOG_STEP)
|
||
|
|
|
||
|
|
def action_analog_down(self) -> None:
|
||
|
|
if not self._names:
|
||
|
|
return
|
||
|
|
name = self._names[self.cursor]
|
||
|
|
if self._type_map.get(name) == "int":
|
||
|
|
self._adjust_analog(name, -self.ANALOG_STEP)
|
||
|
|
|
||
|
|
def _adjust_analog(self, name: str, delta: int) -> None:
|
||
|
|
current = self._analog_pending.get(name)
|
||
|
|
if current is None:
|
||
|
|
current = int(self._state.get(name, 0))
|
||
|
|
new_val = max(0, min(65535, current + delta))
|
||
|
|
self._analog_pending[name] = new_val
|
||
|
|
self.refresh()
|
||
|
|
|
||
|
|
def action_all_off(self) -> None:
|
||
|
|
self.post_message(self.AllOutputs(value=False))
|
||
|
|
|
||
|
|
def action_all_on(self) -> None:
|
||
|
|
self.post_message(self.AllOutputs(value=True))
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Sequence panel
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class SequencePanel(Static, can_focus=True):
|
||
|
|
"""
|
||
|
|
Idle mode: navigable list of all sequences; Enter/Space to run.
|
||
|
|
Running mode: full step list for the active sequence with the current
|
||
|
|
step highlighted and a progress bar header.
|
||
|
|
"""
|
||
|
|
|
||
|
|
DEFAULT_CSS = """
|
||
|
|
SequencePanel {
|
||
|
|
border: solid $warning-darken-2;
|
||
|
|
padding: 0 1;
|
||
|
|
height: 100%;
|
||
|
|
}
|
||
|
|
SequencePanel:focus {
|
||
|
|
border: solid $warning;
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
BINDINGS = [
|
||
|
|
Binding("up", "cursor_up", "Up", show=False),
|
||
|
|
Binding("down", "cursor_down","Down", show=False),
|
||
|
|
Binding("space", "do_run", "Run", show=False),
|
||
|
|
Binding("enter", "do_run", "Run", show=False),
|
||
|
|
]
|
||
|
|
|
||
|
|
cursor: reactive[int] = reactive(0)
|
||
|
|
|
||
|
|
# ── Messages ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class RunSequence(Message):
|
||
|
|
def __init__(self, name: str) -> None:
|
||
|
|
super().__init__()
|
||
|
|
self.name = name
|
||
|
|
|
||
|
|
# ── Init / update ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def __init__(self, sequences: list[Sequence], **kwargs: Any) -> None:
|
||
|
|
super().__init__(**kwargs)
|
||
|
|
self._sequences: list[Sequence] = sequences
|
||
|
|
self._seq_by_name: dict[str, Sequence] = {s.name: s for s in sequences}
|
||
|
|
|
||
|
|
self._active_run: RunResult | None = None
|
||
|
|
self._last_result: RunResult | None = None
|
||
|
|
|
||
|
|
def update_run_state(
|
||
|
|
self,
|
||
|
|
active_run: RunResult | None,
|
||
|
|
last_result: RunResult | None,
|
||
|
|
) -> None:
|
||
|
|
self._active_run = active_run
|
||
|
|
self._last_result = last_result
|
||
|
|
self.refresh()
|
||
|
|
|
||
|
|
# ── Render: dispatch ──────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def render(self) -> str:
|
||
|
|
if self._active_run and self._active_run.status in ("pending", "running"):
|
||
|
|
return self._render_running()
|
||
|
|
return self._render_idle()
|
||
|
|
|
||
|
|
# ── Render: idle (sequence list) ──────────────────────────────────────────
|
||
|
|
|
||
|
|
def _render_idle(self) -> str:
|
||
|
|
lines: list[str] = [
|
||
|
|
"[bold]Sequences[/bold] [dim](↑↓ navigate · Enter run)[/dim]\n"
|
||
|
|
]
|
||
|
|
if not self._sequences:
|
||
|
|
lines.append(" [dim](no sequences configured)[/dim]")
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
for i, seq in enumerate(self._sequences):
|
||
|
|
total = len(seq.steps)
|
||
|
|
if i == self.cursor:
|
||
|
|
line = (
|
||
|
|
f"[reverse] ► [/reverse] [reverse]{seq.name}[/reverse]"
|
||
|
|
f" [dim]{total} steps[/dim]"
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
line = f" {seq.name} [dim]{total} steps[/dim]"
|
||
|
|
lines.append(line)
|
||
|
|
if seq.description:
|
||
|
|
short = seq.description.strip().split("\n")[0][:60]
|
||
|
|
lines.append(f" [dim]{short}[/dim]")
|
||
|
|
|
||
|
|
# Last result summary (shown below the list when idle)
|
||
|
|
if self._last_result:
|
||
|
|
r = self._last_result
|
||
|
|
color = {"success": "green", "failed": "red", "error": "red"}.get(
|
||
|
|
r.status, "dim"
|
||
|
|
)
|
||
|
|
lines.append(
|
||
|
|
f"\n[dim]Last run:[/dim] [{color}]{r.sequence_name} "
|
||
|
|
f"→ {r.status.upper()}[/{color}]"
|
||
|
|
f" [dim]{r.steps_completed}/{r.total_steps} steps"
|
||
|
|
f" {r.duration_ms} ms[/dim]"
|
||
|
|
)
|
||
|
|
if r.failed_step:
|
||
|
|
fs = r.failed_step
|
||
|
|
lines.append(
|
||
|
|
f" [red]✗ step {fs.step_index} ({fs.t_ms} ms):"
|
||
|
|
f" {fs.detail}[/red]"
|
||
|
|
)
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
# ── Render: running (step list) ───────────────────────────────────────────
|
||
|
|
|
||
|
|
def _render_running(self) -> str:
|
||
|
|
run = self._active_run
|
||
|
|
assert run is not None
|
||
|
|
seq = self._seq_by_name.get(run.sequence_name)
|
||
|
|
if seq is None:
|
||
|
|
return f"[yellow]Running: {run.sequence_name}[/yellow]\n[dim](steps unknown)[/dim]"
|
||
|
|
|
||
|
|
total = len(seq.steps)
|
||
|
|
done = run.steps_completed
|
||
|
|
current = run.current_step_index
|
||
|
|
pct = int(done / total * 100) if total else 0
|
||
|
|
bar = _progress_bar(pct, width=16)
|
||
|
|
|
||
|
|
lines: list[str] = [
|
||
|
|
f"[bold yellow]▶ {seq.name}[/bold yellow]"
|
||
|
|
f" {bar} [yellow]{done}/{total}[/yellow]\n"
|
||
|
|
]
|
||
|
|
|
||
|
|
for i, step in enumerate(seq.steps):
|
||
|
|
t_s = step.t_ms / 1000
|
||
|
|
t_str = f"{t_s:6.1f}s"
|
||
|
|
|
||
|
|
if step.action == "set_output":
|
||
|
|
if step.value is not None:
|
||
|
|
# Analog output
|
||
|
|
action = f"set {step.signal} → {step.value}"
|
||
|
|
else:
|
||
|
|
val = "ON " if step.state else "OFF"
|
||
|
|
action = f"set {step.signal} → {val}"
|
||
|
|
elif step.action == "check_input":
|
||
|
|
if step.expected_value is not None:
|
||
|
|
tol = f"±{step.tolerance}" if step.tolerance else ""
|
||
|
|
action = f"chk {step.signal} == {step.expected_value}{tol}"
|
||
|
|
else:
|
||
|
|
exp = "ON " if step.expected else "OFF"
|
||
|
|
action = f"chk {step.signal} == {exp}"
|
||
|
|
elif step.action == "wait_input":
|
||
|
|
t_out = f"{step.timeout_ms} ms" if step.timeout_ms else "?"
|
||
|
|
if step.expected_value is not None:
|
||
|
|
tol = f"±{step.tolerance}" if step.tolerance else ""
|
||
|
|
action = f"wait {step.signal} == {step.expected_value}{tol} (timeout {t_out})"
|
||
|
|
else:
|
||
|
|
exp = "ON " if step.expected else "OFF"
|
||
|
|
action = f"wait {step.signal} == {exp} (timeout {t_out})"
|
||
|
|
else:
|
||
|
|
action = f"{step.action} {step.signal}"
|
||
|
|
|
||
|
|
if i == current:
|
||
|
|
# Currently executing — bright highlight
|
||
|
|
line = f"[reverse][bold yellow] ► {t_str} {action} [/bold yellow][/reverse]"
|
||
|
|
elif i < done:
|
||
|
|
# Already completed
|
||
|
|
line = f" [dim green]✓ {t_str} {action}[/dim green]"
|
||
|
|
else:
|
||
|
|
# Pending
|
||
|
|
line = f" [dim] {t_str} {action}[/dim]"
|
||
|
|
|
||
|
|
lines.append(line)
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
# ── Actions ───────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def action_cursor_up(self) -> None:
|
||
|
|
if self._sequences:
|
||
|
|
self.cursor = (self.cursor - 1) % len(self._sequences)
|
||
|
|
|
||
|
|
def action_cursor_down(self) -> None:
|
||
|
|
if self._sequences:
|
||
|
|
self.cursor = (self.cursor + 1) % len(self._sequences)
|
||
|
|
|
||
|
|
def action_do_run(self) -> None:
|
||
|
|
if not self._sequences:
|
||
|
|
return
|
||
|
|
self.post_message(self.RunSequence(name=self._sequences[self.cursor].name))
|
||
|
|
|
||
|
|
|
||
|
|
def _progress_bar(pct: int, width: int = 16) -> str:
|
||
|
|
filled = int(width * pct / 100)
|
||
|
|
return "[" + "█" * filled + "░" * (width - filled) + "]"
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Status bar
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class StatusBar(Static):
|
||
|
|
DEFAULT_CSS = """
|
||
|
|
StatusBar {
|
||
|
|
dock: top;
|
||
|
|
height: 1;
|
||
|
|
padding: 0 1;
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, **kwargs: Any) -> None:
|
||
|
|
super().__init__("", **kwargs)
|
||
|
|
self._stats: list[dict] = []
|
||
|
|
self._msg: str = ""
|
||
|
|
|
||
|
|
def update_stats(self, stats: list[dict]) -> None:
|
||
|
|
self._stats = stats
|
||
|
|
self._rebuild()
|
||
|
|
|
||
|
|
def set_message(self, msg: str) -> None:
|
||
|
|
self._msg = msg
|
||
|
|
self._rebuild()
|
||
|
|
|
||
|
|
def _rebuild(self) -> None:
|
||
|
|
parts: list[str] = []
|
||
|
|
for s in self._stats:
|
||
|
|
dot = "[green]●[/green]" if s.get("connected") else "[red]✗[/red]"
|
||
|
|
parts.append(
|
||
|
|
f"{dot} {s['device_id']} "
|
||
|
|
f"{s.get('achieved_hz', 0):.0f} Hz "
|
||
|
|
f"err={s.get('error_count', 0)}"
|
||
|
|
)
|
||
|
|
status = " ".join(parts) if parts else "[dim]no devices[/dim]"
|
||
|
|
msg = f" [yellow]{self._msg}[/yellow]" if self._msg else ""
|
||
|
|
self.update(status + msg)
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Main application
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class TerminatorTUI(App):
|
||
|
|
"""Arnold Terminator I/O debug TUI."""
|
||
|
|
|
||
|
|
TITLE = "Arnold — Terminator I/O Debugger"
|
||
|
|
|
||
|
|
CSS = """
|
||
|
|
Screen {
|
||
|
|
layout: vertical;
|
||
|
|
}
|
||
|
|
#main-area {
|
||
|
|
height: 1fr;
|
||
|
|
layout: horizontal;
|
||
|
|
}
|
||
|
|
#input-panel {
|
||
|
|
width: 1fr;
|
||
|
|
height: 100%;
|
||
|
|
overflow-y: auto;
|
||
|
|
}
|
||
|
|
#output-panel {
|
||
|
|
width: 1fr;
|
||
|
|
height: 100%;
|
||
|
|
overflow-y: auto;
|
||
|
|
}
|
||
|
|
#sequence-panel {
|
||
|
|
width: 2fr;
|
||
|
|
height: 100%;
|
||
|
|
overflow-y: auto;
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
BINDINGS = [
|
||
|
|
Binding("q", "quit", "Quit"),
|
||
|
|
Binding("r", "reconnect", "Reconnect"),
|
||
|
|
Binding("tab", "cycle_focus","Tab", show=False),
|
||
|
|
]
|
||
|
|
|
||
|
|
def __init__(self, config: Config, registry: IORegistry,
|
||
|
|
sequencer: Sequencer) -> None:
|
||
|
|
super().__init__()
|
||
|
|
self._cfg = config
|
||
|
|
self._io = registry
|
||
|
|
self._seq = sequencer
|
||
|
|
|
||
|
|
# Build typed signal lists: [(name, value_type), ...]
|
||
|
|
self._input_signals: list[tuple[str, str]] = [
|
||
|
|
(s.name, s.value_type)
|
||
|
|
for s in config.logical_io if s.direction == "input"
|
||
|
|
]
|
||
|
|
self._output_signals: list[tuple[str, str]] = [
|
||
|
|
(s.name, s.value_type)
|
||
|
|
for s in config.logical_io if s.direction == "output"
|
||
|
|
]
|
||
|
|
self._input_names = [n for n, _ in self._input_signals]
|
||
|
|
self._output_names = [n for n, _ in self._output_signals]
|
||
|
|
self._output_type_map: dict[str, str] = {n: vt for n, vt in self._output_signals}
|
||
|
|
|
||
|
|
# Shadow output state — updated on write (EBC100 has no output readback)
|
||
|
|
self._output_state: dict[str, bool | int] = {}
|
||
|
|
for name, vt in self._output_signals:
|
||
|
|
self._output_state[name] = 0 if vt == "int" else False
|
||
|
|
|
||
|
|
# Last completed run (for the sequence panel summary)
|
||
|
|
self._last_run_id: str | None = None
|
||
|
|
self._last_result: RunResult | None = None
|
||
|
|
|
||
|
|
self._refresh_timer: Timer | None = None
|
||
|
|
self._status_clear_timer: Timer | None = None
|
||
|
|
|
||
|
|
# ── Layout ───────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def compose(self) -> ComposeResult:
|
||
|
|
yield StatusBar(id="status-bar")
|
||
|
|
with Horizontal(id="main-area"):
|
||
|
|
yield InputPanel(input_signals=self._input_signals, id="input-panel")
|
||
|
|
yield OutputPanel(output_signals=self._output_signals, id="output-panel")
|
||
|
|
yield SequencePanel(
|
||
|
|
sequences=self._cfg.sequences,
|
||
|
|
id="sequence-panel",
|
||
|
|
)
|
||
|
|
yield Footer()
|
||
|
|
|
||
|
|
def on_mount(self) -> None:
|
||
|
|
self.query_one(OutputPanel).focus()
|
||
|
|
self._io.start()
|
||
|
|
self._refresh_timer = self.set_interval(1 / 20, self._refresh_ui)
|
||
|
|
# Apply output defaults in background (needs connection to settle first)
|
||
|
|
threading.Thread(target=self._apply_defaults, daemon=True).start()
|
||
|
|
|
||
|
|
def _apply_defaults(self) -> None:
|
||
|
|
import time as _time
|
||
|
|
_time.sleep(0.5) # give Modbus connection a moment to establish
|
||
|
|
for sig in self._cfg.logical_io:
|
||
|
|
if sig.direction != "output":
|
||
|
|
continue
|
||
|
|
driver = self._io.driver(sig.device)
|
||
|
|
if driver is None:
|
||
|
|
continue
|
||
|
|
# Digital defaults
|
||
|
|
if sig.default_state is not None and sig.value_type == "bool":
|
||
|
|
ok = driver.write_output(sig.modbus_address, sig.default_state)
|
||
|
|
if ok:
|
||
|
|
self._output_state[sig.name] = sig.default_state
|
||
|
|
# Analog defaults
|
||
|
|
if sig.default_value is not None and sig.value_type == "int":
|
||
|
|
ok = driver.write_register(sig.modbus_address, sig.default_value)
|
||
|
|
if ok:
|
||
|
|
self._output_state[sig.name] = sig.default_value
|
||
|
|
|
||
|
|
# ── Periodic refresh ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _refresh_ui(self) -> None:
|
||
|
|
snapshot = self._io.snapshot()
|
||
|
|
|
||
|
|
# Inputs
|
||
|
|
input_snap = {n: s for n, s in snapshot.items() if n in self._input_names}
|
||
|
|
self.query_one(InputPanel).update_snapshot(input_snap)
|
||
|
|
|
||
|
|
# Outputs (shadow state)
|
||
|
|
self.query_one(OutputPanel).update_output_state(self._output_state)
|
||
|
|
|
||
|
|
# Sequences — get live run result
|
||
|
|
active_id = self._seq.active_run_id()
|
||
|
|
active_run: RunResult | None = None
|
||
|
|
if active_id:
|
||
|
|
active_run = self._seq.get_result(active_id)
|
||
|
|
|
||
|
|
# Detect a newly-completed run and remember it
|
||
|
|
if not active_id and self._last_run_id:
|
||
|
|
result = self._seq.get_result(self._last_run_id)
|
||
|
|
if result and result.status not in ("pending", "running"):
|
||
|
|
self._last_result = result
|
||
|
|
self._last_run_id = None
|
||
|
|
|
||
|
|
self.query_one(SequencePanel).update_run_state(
|
||
|
|
active_run = active_run,
|
||
|
|
last_result = self._last_result,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Status bar
|
||
|
|
driver_map = {d["device_id"]: d for d in self._io.driver_status()}
|
||
|
|
combined: list[dict] = []
|
||
|
|
for ps in self._io.poll_stats():
|
||
|
|
did = ps["device_id"]
|
||
|
|
combined.append({**ps, "connected": driver_map.get(did, {}).get("connected", False)})
|
||
|
|
self.query_one(StatusBar).update_stats(combined)
|
||
|
|
|
||
|
|
# ── Output events ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
@on(OutputPanel.ToggleOutput)
|
||
|
|
def handle_toggle(self, event: OutputPanel.ToggleOutput) -> None:
|
||
|
|
self._write_digital_output(event.signal, not event.current_value)
|
||
|
|
|
||
|
|
@on(OutputPanel.WriteAnalog)
|
||
|
|
def handle_write_analog(self, event: OutputPanel.WriteAnalog) -> None:
|
||
|
|
self._write_analog_output(event.signal, event.value)
|
||
|
|
|
||
|
|
@on(OutputPanel.AllOutputs)
|
||
|
|
def handle_all_outputs(self, event: OutputPanel.AllOutputs) -> None:
|
||
|
|
for name in self._output_names:
|
||
|
|
if self._output_type_map.get(name) == "bool":
|
||
|
|
self._write_digital_output(name, event.value)
|
||
|
|
|
||
|
|
def _write_digital_output(self, signal_name: str, value: bool) -> None:
|
||
|
|
sig = self._cfg.signal(signal_name)
|
||
|
|
if sig is None:
|
||
|
|
self._flash(f"Unknown signal: {signal_name}")
|
||
|
|
return
|
||
|
|
driver = self._io.driver(sig.device)
|
||
|
|
if driver is None:
|
||
|
|
self._flash(f"No driver for {sig.device}")
|
||
|
|
return
|
||
|
|
|
||
|
|
def do_write() -> None:
|
||
|
|
ok = driver.write_output(sig.modbus_address, value)
|
||
|
|
val_str = "ON" if value else "OFF"
|
||
|
|
if ok:
|
||
|
|
self._output_state[signal_name] = value
|
||
|
|
self._flash(f"{signal_name} → {val_str}")
|
||
|
|
else:
|
||
|
|
self._flash(f"WRITE FAILED: {signal_name} → {val_str}")
|
||
|
|
|
||
|
|
threading.Thread(target=do_write, daemon=True).start()
|
||
|
|
|
||
|
|
def _write_analog_output(self, signal_name: str, value: int) -> None:
|
||
|
|
sig = self._cfg.signal(signal_name)
|
||
|
|
if sig is None:
|
||
|
|
self._flash(f"Unknown signal: {signal_name}")
|
||
|
|
return
|
||
|
|
driver = self._io.driver(sig.device)
|
||
|
|
if driver is None:
|
||
|
|
self._flash(f"No driver for {sig.device}")
|
||
|
|
return
|
||
|
|
|
||
|
|
def do_write() -> None:
|
||
|
|
ok = driver.write_register(sig.modbus_address, value)
|
||
|
|
if ok:
|
||
|
|
self._output_state[signal_name] = value
|
||
|
|
self._flash(f"{signal_name} → {value}")
|
||
|
|
else:
|
||
|
|
self._flash(f"WRITE FAILED: {signal_name} → {value}")
|
||
|
|
|
||
|
|
threading.Thread(target=do_write, daemon=True).start()
|
||
|
|
|
||
|
|
# ── Sequence events ───────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
@on(SequencePanel.RunSequence)
|
||
|
|
def handle_run_sequence(self, event: SequencePanel.RunSequence) -> None:
|
||
|
|
active_id = self._seq.active_run_id()
|
||
|
|
if active_id:
|
||
|
|
self._flash(f"Busy: sequence already running")
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
run_id, started = self._seq.start(event.name)
|
||
|
|
except ValueError as e:
|
||
|
|
self._flash(str(e))
|
||
|
|
return
|
||
|
|
|
||
|
|
if not started:
|
||
|
|
self._flash("Busy: sequence already running")
|
||
|
|
return
|
||
|
|
|
||
|
|
self._last_run_id = run_id
|
||
|
|
self._flash(f"Started: {event.name}")
|
||
|
|
|
||
|
|
# ── Status flash ──────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _flash(self, msg: str, duration: float = 4.0) -> None:
|
||
|
|
self.query_one(StatusBar).set_message(msg)
|
||
|
|
if self._status_clear_timer:
|
||
|
|
self._status_clear_timer.stop()
|
||
|
|
self._status_clear_timer = self.set_timer(
|
||
|
|
duration, lambda: self.query_one(StatusBar).set_message("")
|
||
|
|
)
|
||
|
|
|
||
|
|
# ── Actions ───────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def action_cycle_focus(self) -> None:
|
||
|
|
panels = [self.query_one(OutputPanel), self.query_one(SequencePanel)]
|
||
|
|
focused = self.focused
|
||
|
|
try:
|
||
|
|
idx = panels.index(focused) # type: ignore[arg-type]
|
||
|
|
panels[(idx + 1) % len(panels)].focus()
|
||
|
|
except ValueError:
|
||
|
|
panels[0].focus()
|
||
|
|
|
||
|
|
def action_reconnect(self) -> None:
|
||
|
|
self._flash("Reconnecting…")
|
||
|
|
def do_reconnect() -> None:
|
||
|
|
for dev in self._cfg.devices:
|
||
|
|
driver = self._io.driver(dev.id)
|
||
|
|
if driver:
|
||
|
|
driver.connect()
|
||
|
|
self._flash("Reconnect done")
|
||
|
|
threading.Thread(target=do_reconnect, daemon=True).start()
|
||
|
|
|
||
|
|
async def on_unmount(self) -> None:
|
||
|
|
self._io.stop()
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Entry point
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
parser = argparse.ArgumentParser(description="Arnold Terminator I/O debug TUI")
|
||
|
|
parser.add_argument(
|
||
|
|
"config",
|
||
|
|
nargs="?",
|
||
|
|
default=str(Path(__file__).parent / "config.yaml"),
|
||
|
|
help="Path to YAML config file (default: config.yaml)",
|
||
|
|
)
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
try:
|
||
|
|
config = load_config(args.config)
|
||
|
|
except ConfigError as e:
|
||
|
|
print(f"Config error: {e}", file=sys.stderr)
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
registry = IORegistry(config)
|
||
|
|
|
||
|
|
# The sequencer's on_output_write callback keeps _output_state in sync
|
||
|
|
# when a running sequence drives outputs. We store a reference to the
|
||
|
|
# dict and mutate it in-place from the callback (called from the
|
||
|
|
# sequencer's worker thread — dict writes are GIL-safe for simple
|
||
|
|
# key assignment).
|
||
|
|
output_state: dict[str, bool | int] = {}
|
||
|
|
for s in config.logical_io:
|
||
|
|
if s.direction == "output":
|
||
|
|
output_state[s.name] = 0 if s.value_type == "int" else False
|
||
|
|
|
||
|
|
def on_output_write(signal_name: str, value: bool | int) -> None:
|
||
|
|
output_state[signal_name] = value
|
||
|
|
|
||
|
|
sequencer = Sequencer(
|
||
|
|
config = config,
|
||
|
|
registry = registry,
|
||
|
|
log_path = Path(__file__).parent / "runs.log",
|
||
|
|
on_output_write = on_output_write,
|
||
|
|
)
|
||
|
|
|
||
|
|
app = TerminatorTUI(config=config, registry=registry, sequencer=sequencer)
|
||
|
|
# Share the same dict object so the callback and the TUI mutate the same state
|
||
|
|
app._output_state = output_state
|
||
|
|
|
||
|
|
app.run()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|