Files
SequencerIO/arnold/terminator_io.py

728 lines
28 KiB
Python
Raw Normal View History

2026-03-02 17:48:55 -05:00
"""
arnold/terminator_io.py AutomationDirect Terminator I/O driver.
Encapsulates everything that touches a physical T1H-EBC100 controller:
- Modbus TCP connection management (pymodbus, auto-reconnect)
- Signal state cache (thread-safe)
- Background fast-poll thread (reads both coils and registers each cycle)
2026-03-03 17:25:38 -05:00
Dual-connection architecture
----------------------------
Each TerminatorIO maintains TWO independent Modbus TCP connections to
the same EBC100:
_read_client used exclusively by the poll thread (FC02, FC04)
_write_client used exclusively by write callers (FC05, FC06, FC15, FC16)
Each connection has its own lock and connection state. This eliminates
lock contention between the poll thread and output writes, reducing
write latency from 535 ms (old shared-lock design) to 519 ms
(just sleep jitter + Modbus round-trip, no lock wait).
The EBC100 accepts multiple simultaneous TCP connections on port 502
and processes them independently.
2026-03-02 17:48:55 -05:00
Key hardware quirks documented here:
- The EBC100 uses a UNIFIED flat coil address space across all digital
modules in physical slot order. FC02 (read discrete inputs) and
FC01/FC05/FC15 (read/write coils) share the same sequential offsets.
If slot 1 and slot 2 are 8-pt input modules (addresses 0-7, 8-15),
a 16-pt output module in slot 3 starts at coil address 16 NOT 0.
- The EBC100 maintains TWO independent flat address spaces:
coil space (1-bit) digital modules: FC01/FC02/FC05/FC15
register space (16-bit) analog + temperature: FC03/FC04/FC06/FC16
A digital module advances only the coil offset; an analog module
advances only the register offset. They do not interfere.
- FC02 (read discrete inputs) returns input bits starting at address 0.
Because input modules always appear first in the unified coil scheme,
the FC02 bit index equals modbus_address for every digital input signal.
- FC04 (read input registers) returns 16-bit values for analog/temperature
input modules, starting at register address 0 in the register space.
- The EBC100 never raises Modbus exception code 2 (illegal address) for
out-of-range reads it silently returns zeros. Module presence cannot
be auto-detected via protocol errors; use the config 'modules' list.
- The EBC100 responds to any Modbus unit/slave ID over TCP the unit_id
field is echoed back but not used for routing. Set it to 1 (default).
- FC05 write_coil echoes back True for any address, even unmapped ones.
There is no write-error feedback for out-of-range output addresses.
- The device has no unsolicited push capability. Polling is mandatory.
Public API
----------
TerminatorIO(device: DeviceConfig)
2026-03-03 17:25:38 -05:00
.connect() -> bool # connects both read and write clients
.connect_reader() -> bool # connect read client only (poll thread)
.connect_writer() -> bool # connect write client only
2026-03-02 17:48:55 -05:00
.disconnect()
2026-03-03 17:25:38 -05:00
.read_inputs() -> list[bool] | None # bulk FC02, read client
.read_registers(address, count) -> list[int] | None # bulk FC04, read client
.write_output(address, value) -> bool # FC05, write client
.write_outputs(address, values) -> bool # FC15, write client
.write_register(address, value) -> bool # FC06, write client
.write_registers(address, values) -> bool # FC16, write client
2026-03-02 17:48:55 -05:00
.connected: bool
.status() -> dict
SignalState dataclass: name, value (bool|int), updated_at, stale
IORegistry(config) multi-device coordinator
.start() connect + start all poll threads
.stop() stop all poll threads + disconnect
.get(signal) -> SignalState | None
.get_value(signal) -> bool | int | None
.snapshot() -> dict[str, SignalState]
.poll_stats() -> list[dict]
.driver_status() -> list[dict]
"""
from __future__ import annotations
import logging
import threading
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
if TYPE_CHECKING:
from .config import Config, DeviceConfig, LogicalIO
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Signal state
# ---------------------------------------------------------------------------
@dataclass
class SignalState:
name: str
value: bool | int
updated_at: float # time.monotonic()
stale: bool = False # True when the last poll for this device failed
# ---------------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# _ModbusConn — one TCP connection with its own lock and reconnect logic
2026-03-02 17:48:55 -05:00
# ---------------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
class _ModbusConn:
2026-03-02 17:48:55 -05:00
"""
2026-03-03 17:25:38 -05:00
A single Modbus TCP connection with independent lock and state.
2026-03-02 17:48:55 -05:00
2026-03-03 17:25:38 -05:00
TerminatorIO creates two of these: one for reads, one for writes.
Each can connect, reconnect, and operate without blocking the other.
2026-03-02 17:48:55 -05:00
"""
2026-03-03 17:25:38 -05:00
def __init__(self, device: "DeviceConfig", role: str) -> None:
self._device = device
self._role = role # "reader" or "writer" — for log messages
self.lock = threading.Lock()
2026-03-02 17:48:55 -05:00
self._client: ModbusTcpClient | None = None
2026-03-03 17:25:38 -05:00
self.connected = False
self.connect_attempts = 0
self.last_error = ""
2026-03-02 17:48:55 -05:00
def connect(self) -> bool:
2026-03-03 17:25:38 -05:00
"""Open (or reopen) the TCP connection. Call with lock held."""
2026-03-02 17:48:55 -05:00
if self._client is not None:
try:
self._client.close()
except Exception:
pass
self._client = ModbusTcpClient(
2026-03-03 17:25:38 -05:00
host=self._device.host,
port=self._device.port,
2026-03-02 17:48:55 -05:00
timeout=2,
retries=1,
)
2026-03-03 17:25:38 -05:00
self.connect_attempts += 1
2026-03-02 17:48:55 -05:00
ok = self._client.connect()
2026-03-03 17:25:38 -05:00
self.connected = ok
2026-03-02 17:48:55 -05:00
if ok:
2026-03-03 17:25:38 -05:00
log.info("%s %s connected to %s:%d",
self._device.id, self._role,
self._device.host, self._device.port)
2026-03-02 17:48:55 -05:00
else:
2026-03-03 17:25:38 -05:00
self.last_error = (
f"TCP connect failed to {self._device.host}:{self._device.port}"
2026-03-02 17:48:55 -05:00
)
2026-03-03 17:25:38 -05:00
log.warning("%s %s connect failed: %s",
self._device.id, self._role, self.last_error)
2026-03-02 17:48:55 -05:00
return ok
2026-03-03 17:25:38 -05:00
def close(self) -> None:
if self._client:
try:
self._client.close()
except Exception:
pass
self.connected = False
self._client = None
@property
def client(self) -> ModbusTcpClient | None:
return self._client
# ---------------------------------------------------------------------------
# TerminatorIO — one instance per physical EBC100 controller
# ---------------------------------------------------------------------------
class TerminatorIO:
"""
Modbus TCP driver for a single T1H-EBC100 controller.
Uses two independent TCP connections:
- _reader: for poll thread reads (FC02, FC04). Lock held only during reads.
- _writer: for output writes (FC05, FC06, FC15, FC16). Lock held only during writes.
Since each connection has its own lock, writes never block behind reads
and vice versa.
"""
def __init__(self, device: "DeviceConfig") -> None:
self.device = device
self._reader = _ModbusConn(device, "reader")
self._writer = _ModbusConn(device, "writer")
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
def connect(self) -> bool:
"""Open both read and write connections. Returns True if both succeed."""
r = self.connect_reader()
w = self.connect_writer()
return r and w
def connect_reader(self) -> bool:
"""Open the read connection (used by poll thread)."""
with self._reader.lock:
return self._reader.connect()
def connect_writer(self) -> bool:
"""Open the write connection (used by sequencer/API/TUI)."""
with self._writer.lock:
return self._writer.connect()
2026-03-02 17:48:55 -05:00
def disconnect(self) -> None:
2026-03-03 17:25:38 -05:00
with self._reader.lock:
self._reader.close()
with self._writer.lock:
self._writer.close()
2026-03-02 17:48:55 -05:00
@property
def connected(self) -> bool:
2026-03-03 17:25:38 -05:00
return self._reader.connected or self._writer.connected
2026-03-02 17:48:55 -05:00
# ------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# Read inputs — single bulk FC02 request (uses read connection)
2026-03-02 17:48:55 -05:00
# ------------------------------------------------------------------
def read_inputs(self) -> list[bool] | None:
"""
Read all discrete input points in one FC02 request.
Returns a flat list of bool ordered by slot then point (matching
the unified address scheme), or None on comms error.
2026-03-03 17:25:38 -05:00
Uses the read connection never blocks write callers.
2026-03-02 17:48:55 -05:00
"""
total = self.device.total_input_points()
if total == 0:
return []
2026-03-03 17:25:38 -05:00
with self._reader.lock:
return self._fc02(self._reader, address=0, count=total)
2026-03-02 17:48:55 -05:00
2026-03-03 17:25:38 -05:00
def _fc02(self, conn: _ModbusConn, address: int, count: int) -> list[bool] | None:
2026-03-02 17:48:55 -05:00
for attempt in range(2):
2026-03-03 17:25:38 -05:00
if not conn.connected:
if not conn.connect():
2026-03-02 17:48:55 -05:00
return None
try:
2026-03-03 17:25:38 -05:00
rr = conn.client.read_discrete_inputs(
2026-03-02 17:48:55 -05:00
address=address, count=count,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC02 error: %s", self.device.id, rr)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
continue
return list(rr.bits[:count])
except (ModbusException, ConnectionError, OSError) as exc:
2026-03-03 17:25:38 -05:00
log.warning("%s FC02 read error (attempt %d): %s",
2026-03-02 17:48:55 -05:00
self.device.id, attempt + 1, exc)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
time.sleep(0.05)
return None
# ------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# Read analog input registers — bulk FC04 (uses read connection)
2026-03-02 17:48:55 -05:00
# ------------------------------------------------------------------
def read_registers(self, address: int, count: int) -> list[int] | None:
"""
Read contiguous 16-bit input registers via FC04.
2026-03-03 17:25:38 -05:00
Uses the read connection never blocks write callers.
2026-03-02 17:48:55 -05:00
"""
if count == 0:
return []
2026-03-03 17:25:38 -05:00
with self._reader.lock:
return self._fc04(self._reader, address, count)
2026-03-02 17:48:55 -05:00
2026-03-03 17:25:38 -05:00
def _fc04(self, conn: _ModbusConn, address: int, count: int) -> list[int] | None:
2026-03-02 17:48:55 -05:00
for attempt in range(2):
2026-03-03 17:25:38 -05:00
if not conn.connected:
if not conn.connect():
2026-03-02 17:48:55 -05:00
return None
try:
2026-03-03 17:25:38 -05:00
rr = conn.client.read_input_registers(
2026-03-02 17:48:55 -05:00
address=address, count=count,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC04 error: %s", self.device.id, rr)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
continue
return list(rr.registers[:count])
except (ModbusException, ConnectionError, OSError) as exc:
log.warning("%s FC04 read error (attempt %d): %s",
self.device.id, attempt + 1, exc)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
time.sleep(0.05)
return None
# ------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# Write digital outputs (uses write connection)
2026-03-02 17:48:55 -05:00
# ------------------------------------------------------------------
def write_output(self, address: int, value: bool) -> bool:
"""
Write a single coil via FC05.
2026-03-03 17:25:38 -05:00
Uses the write connection never blocked by poll thread reads.
2026-03-02 17:48:55 -05:00
"""
2026-03-03 17:25:38 -05:00
with self._writer.lock:
return self._fc05(self._writer, address, value)
2026-03-02 17:48:55 -05:00
2026-03-03 17:25:38 -05:00
def _fc05(self, conn: _ModbusConn, address: int, value: bool) -> bool:
2026-03-02 17:48:55 -05:00
for attempt in range(2):
2026-03-03 17:25:38 -05:00
if not conn.connected:
if not conn.connect():
2026-03-02 17:48:55 -05:00
return False
try:
2026-03-03 17:25:38 -05:00
rr = conn.client.write_coil(
2026-03-02 17:48:55 -05:00
address=address, value=value,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC05 error addr=%d: %s",
self.device.id, address, rr)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
continue
log.debug("%s coil[%d] = %s", self.device.id, address, value)
return True
except (ModbusException, ConnectionError, OSError) as exc:
2026-03-03 17:25:38 -05:00
log.warning("%s FC05 write error (attempt %d): %s",
2026-03-02 17:48:55 -05:00
self.device.id, attempt + 1, exc)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
time.sleep(0.05)
return False
def write_outputs(self, address: int, values: list[bool]) -> bool:
2026-03-03 17:25:38 -05:00
"""Write multiple contiguous coils via FC15. Uses write connection."""
with self._writer.lock:
return self._fc15(self._writer, address, values)
def _fc15(self, conn: _ModbusConn, address: int, values: list[bool]) -> bool:
for attempt in range(2):
if not conn.connected:
if not conn.connect():
return False
try:
rr = conn.client.write_coils(
address=address, values=values,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC15 error addr=%d: %s",
self.device.id, address, rr)
conn.connected = False
continue
return True
except (ModbusException, ConnectionError, OSError) as exc:
log.warning("%s FC15 write error (attempt %d): %s",
self.device.id, attempt + 1, exc)
conn.connected = False
time.sleep(0.05)
2026-03-02 17:48:55 -05:00
return False
# ------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# Write analog outputs (uses write connection)
2026-03-02 17:48:55 -05:00
# ------------------------------------------------------------------
def write_register(self, address: int, value: int) -> bool:
"""
Write a single 16-bit holding register via FC06.
2026-03-03 17:25:38 -05:00
Uses the write connection never blocked by poll thread reads.
2026-03-02 17:48:55 -05:00
"""
2026-03-03 17:25:38 -05:00
with self._writer.lock:
return self._fc06(self._writer, address, value)
2026-03-02 17:48:55 -05:00
2026-03-03 17:25:38 -05:00
def _fc06(self, conn: _ModbusConn, address: int, value: int) -> bool:
2026-03-02 17:48:55 -05:00
for attempt in range(2):
2026-03-03 17:25:38 -05:00
if not conn.connected:
if not conn.connect():
2026-03-02 17:48:55 -05:00
return False
try:
2026-03-03 17:25:38 -05:00
rr = conn.client.write_register(
2026-03-02 17:48:55 -05:00
address=address, value=value,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC06 error addr=%d: %s",
self.device.id, address, rr)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
continue
log.debug("%s reg[%d] = %d", self.device.id, address, value)
return True
except (ModbusException, ConnectionError, OSError) as exc:
log.warning("%s FC06 write error (attempt %d): %s",
self.device.id, attempt + 1, exc)
2026-03-03 17:25:38 -05:00
conn.connected = False
2026-03-02 17:48:55 -05:00
time.sleep(0.05)
return False
def write_registers(self, address: int, values: list[int]) -> bool:
2026-03-03 17:25:38 -05:00
"""Write multiple contiguous 16-bit holding registers via FC16.
Uses write connection."""
with self._writer.lock:
return self._fc16(self._writer, address, values)
def _fc16(self, conn: _ModbusConn, address: int, values: list[int]) -> bool:
for attempt in range(2):
if not conn.connected:
if not conn.connect():
return False
try:
rr = conn.client.write_registers(
address=address, values=values,
device_id=self.device.unit_id,
)
if rr.isError() or isinstance(rr, ExceptionResponse):
log.warning("%s FC16 error addr=%d: %s",
self.device.id, address, rr)
conn.connected = False
continue
return True
except (ModbusException, ConnectionError, OSError) as exc:
log.warning("%s FC16 write error (attempt %d): %s",
self.device.id, attempt + 1, exc)
conn.connected = False
time.sleep(0.05)
2026-03-02 17:48:55 -05:00
return False
# ------------------------------------------------------------------
# Status
# ------------------------------------------------------------------
def status(self) -> dict:
return {
"device_id": self.device.id,
"host": self.device.host,
"port": self.device.port,
2026-03-03 17:25:38 -05:00
"connected": self.connected,
"reader_connected": self._reader.connected,
"writer_connected": self._writer.connected,
"reader_connect_attempts": self._reader.connect_attempts,
"writer_connect_attempts": self._writer.connect_attempts,
"last_reader_error": self._reader.last_error or None,
"last_writer_error": self._writer.last_error or None,
2026-03-02 17:48:55 -05:00
}
# ---------------------------------------------------------------------------
# _PollThread — internal; one per TerminatorIO instance
# ---------------------------------------------------------------------------
class _PollThread(threading.Thread):
"""
Reads all input points from one EBC100 at poll_interval_ms, updates the
shared signal cache. Daemon thread exits when the process does.
2026-03-03 17:25:38 -05:00
Each poll cycle reads BOTH address spaces via the driver's read connection:
- FC02 (coil space): digital input signals -> list[bool]
- FC04 (register space): analog/temperature input signals -> list[int]
The read connection has its own lock, so poll reads never block output
writes (which use the separate write connection).
2026-03-02 17:48:55 -05:00
"""
def __init__(
self,
driver: TerminatorIO,
2026-03-03 17:25:38 -05:00
digital_signals: list["LogicalIO"],
analog_signals: list["LogicalIO"],
2026-03-02 17:48:55 -05:00
cache: dict[str, SignalState],
lock: threading.Lock,
) -> None:
super().__init__(name=f"poll-{driver.device.id}", daemon=True)
self._driver = driver
self._digital_signals = digital_signals
self._analog_signals = analog_signals
self._cache = cache
self._lock = lock
self._stop = threading.Event()
self.poll_count = 0
self.error_count = 0
self._achieved_hz: float = 0.0
self._last_poll_ts: float | None = None
@property
def _total_signals(self) -> int:
return len(self._digital_signals) + len(self._analog_signals)
def stop(self) -> None:
self._stop.set()
def run(self) -> None:
interval = self._driver.device.poll_interval_ms / 1000.0
log.info("Poll thread started: %s %.0f ms interval %d digital + %d analog signals",
self._driver.device.id,
self._driver.device.poll_interval_ms,
len(self._digital_signals),
len(self._analog_signals))
2026-03-03 17:25:38 -05:00
# Only connect the read client; the write client connects on first use
self._driver.connect_reader()
2026-03-02 17:48:55 -05:00
rate_t0 = time.monotonic()
rate_polls = 0
while not self._stop.is_set():
t0 = time.monotonic()
self._cycle()
rate_polls += 1
self.poll_count += 1
elapsed = time.monotonic() - t0
# Update achieved rate every 5 s
window = time.monotonic() - rate_t0
if window >= 5.0:
self._achieved_hz = rate_polls / window
log.debug("%s %.1f polls/s errors=%d",
self._driver.device.id,
self._achieved_hz, self.error_count)
rate_t0 = time.monotonic()
rate_polls = 0
wait = interval - elapsed
if wait > 0:
self._stop.wait(wait)
log.info("Poll thread stopped: %s", self._driver.device.id)
self._driver.disconnect()
def _cycle(self) -> None:
if not self._digital_signals and not self._analog_signals:
return
had_error = False
updates: dict[str, SignalState] = {}
now = time.monotonic()
2026-03-03 17:25:38 -05:00
# -- Digital inputs (FC02, coil space) -------------------------
2026-03-02 17:48:55 -05:00
if self._digital_signals:
bits = self._driver.read_inputs()
if bits is None:
had_error = True
for sig in self._digital_signals:
existing = self._cache.get(sig.name)
updates[sig.name] = SignalState(
name=sig.name,
value=existing.value if existing else False,
updated_at=existing.updated_at if existing else now,
stale=True,
)
else:
for sig in self._digital_signals:
if sig.modbus_address < len(bits):
updates[sig.name] = SignalState(
name=sig.name,
value=bool(bits[sig.modbus_address]),
updated_at=now,
stale=False,
)
else:
log.warning("%s signal %r addr %d out of range (%d bits)",
self._driver.device.id, sig.name,
sig.modbus_address, len(bits))
2026-03-03 17:25:38 -05:00
# -- Analog / temperature inputs (FC04, register space) --------
2026-03-02 17:48:55 -05:00
if self._analog_signals:
total_regs = self._driver.device.total_analog_input_channels()
regs = self._driver.read_registers(address=0, count=total_regs)
if regs is None:
had_error = True
for sig in self._analog_signals:
existing = self._cache.get(sig.name)
updates[sig.name] = SignalState(
name=sig.name,
value=existing.value if existing else 0,
updated_at=existing.updated_at if existing else now,
stale=True,
)
else:
for sig in self._analog_signals:
if sig.modbus_address < len(regs):
updates[sig.name] = SignalState(
name=sig.name,
value=int(regs[sig.modbus_address]),
updated_at=now,
stale=False,
)
else:
log.warning("%s signal %r reg addr %d out of range (%d regs)",
self._driver.device.id, sig.name,
sig.modbus_address, len(regs))
if had_error:
self.error_count += 1
self._last_poll_ts = now
with self._lock:
self._cache.update(updates)
def stats(self) -> dict:
return {
"device_id": self._driver.device.id,
"poll_count": self.poll_count,
"error_count": self.error_count,
"achieved_hz": round(self._achieved_hz, 1),
"target_hz": round(1000 / self._driver.device.poll_interval_ms, 1),
"last_poll_ts": self._last_poll_ts,
"running": self.is_alive(),
}
# ---------------------------------------------------------------------------
2026-03-03 17:25:38 -05:00
# IORegistry — multi-device coordinator
2026-03-02 17:48:55 -05:00
# ---------------------------------------------------------------------------
class IORegistry:
"""
Owns all TerminatorIO drivers and poll threads for the full config.
Usage:
registry = IORegistry(config)
registry.start() # connect + begin polling
...
val = registry.get_value("my_signal")
registry.stop()
"""
def __init__(self, config: "Config") -> None:
self._config = config
self._cache: dict[str, SignalState] = {}
self._lock = threading.Lock()
# Build one TerminatorIO + one _PollThread per device
self._drivers: dict[str, TerminatorIO] = {}
self._pollers: list[_PollThread] = []
for device in config.devices:
driver = TerminatorIO(device)
self._drivers[device.id] = driver
# Partition input signals by address space
digital_inputs = sorted(
(s for s in config.logical_io
if s.device == device.id
and s.direction == "input"
and s.modbus_space == "coil"),
key=lambda s: s.modbus_address,
)
analog_inputs = sorted(
(s for s in config.logical_io
if s.device == device.id
and s.direction == "input"
and s.modbus_space == "register"),
key=lambda s: s.modbus_address,
)
poller = _PollThread(
driver, digital_inputs, analog_inputs,
self._cache, self._lock,
)
self._pollers.append(poller)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self) -> None:
2026-03-03 17:25:38 -05:00
"""Start all poll threads (each connects its read client on first cycle)."""
2026-03-02 17:48:55 -05:00
for p in self._pollers:
p.start()
def stop(self) -> None:
"""Stop all poll threads and disconnect all drivers."""
for p in self._pollers:
p.stop()
for p in self._pollers:
p.join(timeout=3)
# ------------------------------------------------------------------
# Signal reads (used by sequencer + API)
# ------------------------------------------------------------------
def get(self, signal_name: str) -> SignalState | None:
with self._lock:
return self._cache.get(signal_name)
def get_value(self, signal_name: str) -> bool | int | None:
with self._lock:
s = self._cache.get(signal_name)
return s.value if s is not None else None
def is_stale(self, signal_name: str) -> bool:
with self._lock:
s = self._cache.get(signal_name)
return s.stale if s is not None else True
def snapshot(self) -> dict[str, SignalState]:
"""Shallow copy of the full signal cache."""
with self._lock:
return dict(self._cache)
# ------------------------------------------------------------------
# Output writes (used by sequencer)
# ------------------------------------------------------------------
def driver(self, device_id: str) -> TerminatorIO | None:
return self._drivers.get(device_id)
# ------------------------------------------------------------------
# Status / stats
# ------------------------------------------------------------------
def driver_status(self) -> list[dict]:
return [d.status() for d in self._drivers.values()]
def poll_stats(self) -> list[dict]:
return [p.stats() for p in self._pollers]