""" arnold/terminator_io.py — AutomationDirect Terminator I/O driver. Encapsulates everything that touches a physical T1H-EBC100 controller: - Modbus TCP connection management (pymodbus, auto-reconnect) - Signal state cache (thread-safe) - Background fast-poll thread (reads both coils and registers each cycle) Dual-connection architecture ---------------------------- Each TerminatorIO maintains TWO independent Modbus TCP connections to the same EBC100: _read_client — used exclusively by the poll thread (FC02, FC04) _write_client — used exclusively by write callers (FC05, FC06, FC15, FC16) Each connection has its own lock and connection state. This eliminates lock contention between the poll thread and output writes, reducing write latency from 5–35 ms (old shared-lock design) to 5–19 ms (just sleep jitter + Modbus round-trip, no lock wait). The EBC100 accepts multiple simultaneous TCP connections on port 502 and processes them independently. Key hardware quirks documented here: - The EBC100 uses a UNIFIED flat coil address space across all digital modules in physical slot order. FC02 (read discrete inputs) and FC01/FC05/FC15 (read/write coils) share the same sequential offsets. If slot 1 and slot 2 are 8-pt input modules (addresses 0-7, 8-15), a 16-pt output module in slot 3 starts at coil address 16 — NOT 0. - The EBC100 maintains TWO independent flat address spaces: coil space (1-bit) — digital modules: FC01/FC02/FC05/FC15 register space (16-bit) — analog + temperature: FC03/FC04/FC06/FC16 A digital module advances only the coil offset; an analog module advances only the register offset. They do not interfere. - FC02 (read discrete inputs) returns input bits starting at address 0. Because input modules always appear first in the unified coil scheme, the FC02 bit index equals modbus_address for every digital input signal. - FC04 (read input registers) returns 16-bit values for analog/temperature input modules, starting at register address 0 in the register space. - The EBC100 never raises Modbus exception code 2 (illegal address) for out-of-range reads — it silently returns zeros. Module presence cannot be auto-detected via protocol errors; use the config 'modules' list. - The EBC100 responds to any Modbus unit/slave ID over TCP — the unit_id field is echoed back but not used for routing. Set it to 1 (default). - FC05 write_coil echoes back True for any address, even unmapped ones. There is no write-error feedback for out-of-range output addresses. - The device has no unsolicited push capability. Polling is mandatory. Public API ---------- TerminatorIO(device: DeviceConfig) .connect() -> bool # connects both read and write clients .connect_reader() -> bool # connect read client only (poll thread) .connect_writer() -> bool # connect write client only .disconnect() .read_inputs() -> list[bool] | None # bulk FC02, read client .read_registers(address, count) -> list[int] | None # bulk FC04, read client .write_output(address, value) -> bool # FC05, write client .write_outputs(address, values) -> bool # FC15, write client .write_register(address, value) -> bool # FC06, write client .write_registers(address, values) -> bool # FC16, write client .connected: bool .status() -> dict SignalState dataclass: name, value (bool|int), updated_at, stale IORegistry(config) multi-device coordinator .start() connect + start all poll threads .stop() stop all poll threads + disconnect .get(signal) -> SignalState | None .get_value(signal) -> bool | int | None .snapshot() -> dict[str, SignalState] .poll_stats() -> list[dict] .driver_status() -> list[dict] """ from __future__ import annotations import logging import threading import time from dataclasses import dataclass from typing import TYPE_CHECKING from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusException from pymodbus.pdu import ExceptionResponse if TYPE_CHECKING: from .config import Config, DeviceConfig, LogicalIO log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Signal state # --------------------------------------------------------------------------- @dataclass class SignalState: name: str value: bool | int updated_at: float # time.monotonic() stale: bool = False # True when the last poll for this device failed # --------------------------------------------------------------------------- # _ModbusConn — one TCP connection with its own lock and reconnect logic # --------------------------------------------------------------------------- class _ModbusConn: """ A single Modbus TCP connection with independent lock and state. TerminatorIO creates two of these: one for reads, one for writes. Each can connect, reconnect, and operate without blocking the other. """ def __init__(self, device: "DeviceConfig", role: str) -> None: self._device = device self._role = role # "reader" or "writer" — for log messages self.lock = threading.Lock() self._client: ModbusTcpClient | None = None self.connected = False self.connect_attempts = 0 self.last_error = "" def connect(self) -> bool: """Open (or reopen) the TCP connection. Call with lock held.""" if self._client is not None: try: self._client.close() except Exception: pass self._client = ModbusTcpClient( host=self._device.host, port=self._device.port, timeout=2, retries=1, ) self.connect_attempts += 1 ok = self._client.connect() self.connected = ok if ok: log.info("%s %s connected to %s:%d", self._device.id, self._role, self._device.host, self._device.port) else: self.last_error = ( f"TCP connect failed to {self._device.host}:{self._device.port}" ) log.warning("%s %s connect failed: %s", self._device.id, self._role, self.last_error) return ok def close(self) -> None: if self._client: try: self._client.close() except Exception: pass self.connected = False self._client = None @property def client(self) -> ModbusTcpClient | None: return self._client # --------------------------------------------------------------------------- # TerminatorIO — one instance per physical EBC100 controller # --------------------------------------------------------------------------- class TerminatorIO: """ Modbus TCP driver for a single T1H-EBC100 controller. Uses two independent TCP connections: - _reader: for poll thread reads (FC02, FC04). Lock held only during reads. - _writer: for output writes (FC05, FC06, FC15, FC16). Lock held only during writes. Since each connection has its own lock, writes never block behind reads and vice versa. """ def __init__(self, device: "DeviceConfig") -> None: self.device = device self._reader = _ModbusConn(device, "reader") self._writer = _ModbusConn(device, "writer") # ------------------------------------------------------------------ # Connection # ------------------------------------------------------------------ def connect(self) -> bool: """Open both read and write connections. Returns True if both succeed.""" r = self.connect_reader() w = self.connect_writer() return r and w def connect_reader(self) -> bool: """Open the read connection (used by poll thread).""" with self._reader.lock: return self._reader.connect() def connect_writer(self) -> bool: """Open the write connection (used by sequencer/API/TUI).""" with self._writer.lock: return self._writer.connect() def disconnect(self) -> None: with self._reader.lock: self._reader.close() with self._writer.lock: self._writer.close() @property def connected(self) -> bool: return self._reader.connected or self._writer.connected # ------------------------------------------------------------------ # Read inputs — single bulk FC02 request (uses read connection) # ------------------------------------------------------------------ def read_inputs(self) -> list[bool] | None: """ Read all discrete input points in one FC02 request. Returns a flat list of bool ordered by slot then point (matching the unified address scheme), or None on comms error. Uses the read connection — never blocks write callers. """ total = self.device.total_input_points() if total == 0: return [] with self._reader.lock: return self._fc02(self._reader, address=0, count=total) def _fc02(self, conn: _ModbusConn, address: int, count: int) -> list[bool] | None: for attempt in range(2): if not conn.connected: if not conn.connect(): return None try: rr = conn.client.read_discrete_inputs( address=address, count=count, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC02 error: %s", self.device.id, rr) conn.connected = False continue return list(rr.bits[:count]) except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC02 read error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return None # ------------------------------------------------------------------ # Read analog input registers — bulk FC04 (uses read connection) # ------------------------------------------------------------------ def read_registers(self, address: int, count: int) -> list[int] | None: """ Read contiguous 16-bit input registers via FC04. Uses the read connection — never blocks write callers. """ if count == 0: return [] with self._reader.lock: return self._fc04(self._reader, address, count) def _fc04(self, conn: _ModbusConn, address: int, count: int) -> list[int] | None: for attempt in range(2): if not conn.connected: if not conn.connect(): return None try: rr = conn.client.read_input_registers( address=address, count=count, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC04 error: %s", self.device.id, rr) conn.connected = False continue return list(rr.registers[:count]) except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC04 read error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return None # ------------------------------------------------------------------ # Write digital outputs (uses write connection) # ------------------------------------------------------------------ def write_output(self, address: int, value: bool) -> bool: """ Write a single coil via FC05. Uses the write connection — never blocked by poll thread reads. """ with self._writer.lock: return self._fc05(self._writer, address, value) def _fc05(self, conn: _ModbusConn, address: int, value: bool) -> bool: for attempt in range(2): if not conn.connected: if not conn.connect(): return False try: rr = conn.client.write_coil( address=address, value=value, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC05 error addr=%d: %s", self.device.id, address, rr) conn.connected = False continue log.debug("%s coil[%d] = %s", self.device.id, address, value) return True except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC05 write error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return False def write_outputs(self, address: int, values: list[bool]) -> bool: """Write multiple contiguous coils via FC15. Uses write connection.""" with self._writer.lock: return self._fc15(self._writer, address, values) def _fc15(self, conn: _ModbusConn, address: int, values: list[bool]) -> bool: for attempt in range(2): if not conn.connected: if not conn.connect(): return False try: rr = conn.client.write_coils( address=address, values=values, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC15 error addr=%d: %s", self.device.id, address, rr) conn.connected = False continue return True except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC15 write error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return False # ------------------------------------------------------------------ # Write analog outputs (uses write connection) # ------------------------------------------------------------------ def write_register(self, address: int, value: int) -> bool: """ Write a single 16-bit holding register via FC06. Uses the write connection — never blocked by poll thread reads. """ with self._writer.lock: return self._fc06(self._writer, address, value) def _fc06(self, conn: _ModbusConn, address: int, value: int) -> bool: for attempt in range(2): if not conn.connected: if not conn.connect(): return False try: rr = conn.client.write_register( address=address, value=value, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC06 error addr=%d: %s", self.device.id, address, rr) conn.connected = False continue log.debug("%s reg[%d] = %d", self.device.id, address, value) return True except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC06 write error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return False def write_registers(self, address: int, values: list[int]) -> bool: """Write multiple contiguous 16-bit holding registers via FC16. Uses write connection.""" with self._writer.lock: return self._fc16(self._writer, address, values) def _fc16(self, conn: _ModbusConn, address: int, values: list[int]) -> bool: for attempt in range(2): if not conn.connected: if not conn.connect(): return False try: rr = conn.client.write_registers( address=address, values=values, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC16 error addr=%d: %s", self.device.id, address, rr) conn.connected = False continue return True except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC16 write error (attempt %d): %s", self.device.id, attempt + 1, exc) conn.connected = False time.sleep(0.05) return False # ------------------------------------------------------------------ # Status # ------------------------------------------------------------------ def status(self) -> dict: return { "device_id": self.device.id, "host": self.device.host, "port": self.device.port, "connected": self.connected, "reader_connected": self._reader.connected, "writer_connected": self._writer.connected, "reader_connect_attempts": self._reader.connect_attempts, "writer_connect_attempts": self._writer.connect_attempts, "last_reader_error": self._reader.last_error or None, "last_writer_error": self._writer.last_error or None, } # --------------------------------------------------------------------------- # _PollThread — internal; one per TerminatorIO instance # --------------------------------------------------------------------------- class _PollThread(threading.Thread): """ Reads all input points from one EBC100 at poll_interval_ms, updates the shared signal cache. Daemon thread — exits when the process does. Each poll cycle reads BOTH address spaces via the driver's read connection: - FC02 (coil space): digital input signals -> list[bool] - FC04 (register space): analog/temperature input signals -> list[int] The read connection has its own lock, so poll reads never block output writes (which use the separate write connection). """ def __init__( self, driver: TerminatorIO, digital_signals: list["LogicalIO"], analog_signals: list["LogicalIO"], cache: dict[str, SignalState], lock: threading.Lock, ) -> None: super().__init__(name=f"poll-{driver.device.id}", daemon=True) self._driver = driver self._digital_signals = digital_signals self._analog_signals = analog_signals self._cache = cache self._lock = lock self._stop = threading.Event() self.poll_count = 0 self.error_count = 0 self._achieved_hz: float = 0.0 self._last_poll_ts: float | None = None @property def _total_signals(self) -> int: return len(self._digital_signals) + len(self._analog_signals) def stop(self) -> None: self._stop.set() def run(self) -> None: interval = self._driver.device.poll_interval_ms / 1000.0 log.info("Poll thread started: %s %.0f ms interval %d digital + %d analog signals", self._driver.device.id, self._driver.device.poll_interval_ms, len(self._digital_signals), len(self._analog_signals)) # Only connect the read client; the write client connects on first use self._driver.connect_reader() rate_t0 = time.monotonic() rate_polls = 0 while not self._stop.is_set(): t0 = time.monotonic() self._cycle() rate_polls += 1 self.poll_count += 1 elapsed = time.monotonic() - t0 # Update achieved rate every 5 s window = time.monotonic() - rate_t0 if window >= 5.0: self._achieved_hz = rate_polls / window log.debug("%s %.1f polls/s errors=%d", self._driver.device.id, self._achieved_hz, self.error_count) rate_t0 = time.monotonic() rate_polls = 0 wait = interval - elapsed if wait > 0: self._stop.wait(wait) log.info("Poll thread stopped: %s", self._driver.device.id) self._driver.disconnect() def _cycle(self) -> None: if not self._digital_signals and not self._analog_signals: return had_error = False updates: dict[str, SignalState] = {} now = time.monotonic() # -- Digital inputs (FC02, coil space) ------------------------- if self._digital_signals: bits = self._driver.read_inputs() if bits is None: had_error = True for sig in self._digital_signals: existing = self._cache.get(sig.name) updates[sig.name] = SignalState( name=sig.name, value=existing.value if existing else False, updated_at=existing.updated_at if existing else now, stale=True, ) else: for sig in self._digital_signals: if sig.modbus_address < len(bits): updates[sig.name] = SignalState( name=sig.name, value=bool(bits[sig.modbus_address]), updated_at=now, stale=False, ) else: log.warning("%s signal %r addr %d out of range (%d bits)", self._driver.device.id, sig.name, sig.modbus_address, len(bits)) # -- Analog / temperature inputs (FC04, register space) -------- if self._analog_signals: total_regs = self._driver.device.total_analog_input_channels() regs = self._driver.read_registers(address=0, count=total_regs) if regs is None: had_error = True for sig in self._analog_signals: existing = self._cache.get(sig.name) updates[sig.name] = SignalState( name=sig.name, value=existing.value if existing else 0, updated_at=existing.updated_at if existing else now, stale=True, ) else: for sig in self._analog_signals: if sig.modbus_address < len(regs): updates[sig.name] = SignalState( name=sig.name, value=int(regs[sig.modbus_address]), updated_at=now, stale=False, ) else: log.warning("%s signal %r reg addr %d out of range (%d regs)", self._driver.device.id, sig.name, sig.modbus_address, len(regs)) if had_error: self.error_count += 1 self._last_poll_ts = now with self._lock: self._cache.update(updates) def stats(self) -> dict: return { "device_id": self._driver.device.id, "poll_count": self.poll_count, "error_count": self.error_count, "achieved_hz": round(self._achieved_hz, 1), "target_hz": round(1000 / self._driver.device.poll_interval_ms, 1), "last_poll_ts": self._last_poll_ts, "running": self.is_alive(), } # --------------------------------------------------------------------------- # IORegistry — multi-device coordinator # --------------------------------------------------------------------------- class IORegistry: """ Owns all TerminatorIO drivers and poll threads for the full config. Usage: registry = IORegistry(config) registry.start() # connect + begin polling ... val = registry.get_value("my_signal") registry.stop() """ def __init__(self, config: "Config") -> None: self._config = config self._cache: dict[str, SignalState] = {} self._lock = threading.Lock() # Build one TerminatorIO + one _PollThread per device self._drivers: dict[str, TerminatorIO] = {} self._pollers: list[_PollThread] = [] for device in config.devices: driver = TerminatorIO(device) self._drivers[device.id] = driver # Partition input signals by address space digital_inputs = sorted( (s for s in config.logical_io if s.device == device.id and s.direction == "input" and s.modbus_space == "coil"), key=lambda s: s.modbus_address, ) analog_inputs = sorted( (s for s in config.logical_io if s.device == device.id and s.direction == "input" and s.modbus_space == "register"), key=lambda s: s.modbus_address, ) poller = _PollThread( driver, digital_inputs, analog_inputs, self._cache, self._lock, ) self._pollers.append(poller) # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def start(self) -> None: """Start all poll threads (each connects its read client on first cycle).""" for p in self._pollers: p.start() def stop(self) -> None: """Stop all poll threads and disconnect all drivers.""" for p in self._pollers: p.stop() for p in self._pollers: p.join(timeout=3) # ------------------------------------------------------------------ # Signal reads (used by sequencer + API) # ------------------------------------------------------------------ def get(self, signal_name: str) -> SignalState | None: with self._lock: return self._cache.get(signal_name) def get_value(self, signal_name: str) -> bool | int | None: with self._lock: s = self._cache.get(signal_name) return s.value if s is not None else None def is_stale(self, signal_name: str) -> bool: with self._lock: s = self._cache.get(signal_name) return s.stale if s is not None else True def snapshot(self) -> dict[str, SignalState]: """Shallow copy of the full signal cache.""" with self._lock: return dict(self._cache) # ------------------------------------------------------------------ # Output writes (used by sequencer) # ------------------------------------------------------------------ def driver(self, device_id: str) -> TerminatorIO | None: return self._drivers.get(device_id) # ------------------------------------------------------------------ # Status / stats # ------------------------------------------------------------------ def driver_status(self) -> list[dict]: return [d.status() for d in self._drivers.values()] def poll_stats(self) -> list[dict]: return [p.stats() for p in self._pollers]