diff --git a/README.md b/README.md index 800d102..c34d751 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,7 @@ arnold/ __init__.py module_types.py ModuleType frozen dataclass + 44-module registry config.py YAML loader, validation, dual address space computation - terminator_io.py Modbus TCP driver, signal cache, dual-space poll thread + terminator_io.py Modbus TCP driver (dual-connection), signal cache, poll thread sequencer.py Sequence engine: timing, digital+analog set/check/wait api.py FastAPI app: REST endpoints, static file mount for web UI @@ -139,13 +139,13 @@ web/ YAML config ──► config.py ──► module_types.py (resolve part numbers) │ ▼ - terminator_io.py - ┌─ TerminatorIO (Modbus TCP client per device) - │ FC02 read discrete inputs (digital) - │ FC04 read input registers (analog) - │ FC05/FC06 write single coil/register - │ FC15/FC16 write multiple coils/registers - └─ _PollThread (daemon, reads FC02+FC04 each cycle) + terminator_io.py + ┌─ TerminatorIO (two TCP connections per device) + │ ┌─ _reader conn ── FC02 read discrete inputs (digital) + │ │ FC04 read input registers (analog) + │ └─ _writer conn ── FC05/FC06 write single coil/register + │ FC15/FC16 write multiple coils/registers + └─ _PollThread (daemon, reads via _reader each cycle) └─ IORegistry (multi-device coordinator, signal cache) │ ┌────────┼────────┐ @@ -162,6 +162,14 @@ The EBC100 has two independent flat address spaces: A digital module advances only `coil_offset`. An analog module advances only `register_offset`. They do not interfere. `config.py` computes all addresses at load time. +### Dual-connection architecture + +Each `TerminatorIO` opens two independent TCP connections to the EBC100: +a **reader** (used exclusively by the poll thread for FC02/FC04) and a +**writer** (used by sequencer/API/TUI for FC05/FC06/FC15/FC16). Each has +its own lock and reconnect state. Writes never block behind poll reads, +reducing typical output actuation jitter from 5–35 ms to 5–19 ms. + ### EBC100 quirks - Returns zeros for out-of-range reads (no Modbus exception code 2) diff --git a/arnold/__pycache__/terminator_io.cpython-311.pyc b/arnold/__pycache__/terminator_io.cpython-311.pyc index 5eb93c3..27997d0 100644 Binary files a/arnold/__pycache__/terminator_io.cpython-311.pyc and b/arnold/__pycache__/terminator_io.cpython-311.pyc differ diff --git a/arnold/terminator_io.py b/arnold/terminator_io.py index 82bfe15..11a22d4 100644 --- a/arnold/terminator_io.py +++ b/arnold/terminator_io.py @@ -6,6 +6,22 @@ Encapsulates everything that touches a physical T1H-EBC100 controller: - Signal state cache (thread-safe) - Background fast-poll thread (reads both coils and registers each cycle) +Dual-connection architecture +---------------------------- + Each TerminatorIO maintains TWO independent Modbus TCP connections to + the same EBC100: + + _read_client — used exclusively by the poll thread (FC02, FC04) + _write_client — used exclusively by write callers (FC05, FC06, FC15, FC16) + + Each connection has its own lock and connection state. This eliminates + lock contention between the poll thread and output writes, reducing + write latency from 5–35 ms (old shared-lock design) to 5–19 ms + (just sleep jitter + Modbus round-trip, no lock wait). + + The EBC100 accepts multiple simultaneous TCP connections on port 502 + and processes them independently. + Key hardware quirks documented here: - The EBC100 uses a UNIFIED flat coil address space across all digital modules in physical slot order. FC02 (read discrete inputs) and @@ -41,14 +57,16 @@ Key hardware quirks documented here: Public API ---------- TerminatorIO(device: DeviceConfig) - .connect() -> bool + .connect() -> bool # connects both read and write clients + .connect_reader() -> bool # connect read client only (poll thread) + .connect_writer() -> bool # connect write client only .disconnect() - .read_inputs() -> list[bool] | None # bulk FC02, digital inputs - .read_registers(address, count) -> list[int] | None # bulk FC04, analog inputs - .write_output(address, value) -> bool # FC05 single coil - .write_outputs(address, values) -> bool # FC15 multiple coils - .write_register(address, value) -> bool # FC06 single register - .write_registers(address, values) -> bool # FC16 multiple registers + .read_inputs() -> list[bool] | None # bulk FC02, read client + .read_registers(address, count) -> list[int] | None # bulk FC04, read client + .write_output(address, value) -> bool # FC05, write client + .write_outputs(address, values) -> bool # FC15, write client + .write_register(address, value) -> bool # FC06, write client + .write_registers(address, values) -> bool # FC16, write client .connected: bool .status() -> dict @@ -94,36 +112,28 @@ class SignalState: # --------------------------------------------------------------------------- -# TerminatorIO — one instance per physical EBC100 controller +# _ModbusConn — one TCP connection with its own lock and reconnect logic # --------------------------------------------------------------------------- -class TerminatorIO: +class _ModbusConn: """ - Modbus TCP driver for a single T1H-EBC100 controller. + A single Modbus TCP connection with independent lock and state. - Thread-safe: all public methods acquire an internal lock. The poll - thread holds the lock only for the duration of each FC02 call, so - write_output() will block at most one poll cycle (~50 ms). + TerminatorIO creates two of these: one for reads, one for writes. + Each can connect, reconnect, and operate without blocking the other. """ - def __init__(self, device: "DeviceConfig") -> None: - self.device = device - self._lock = threading.Lock() + def __init__(self, device: "DeviceConfig", role: str) -> None: + self._device = device + self._role = role # "reader" or "writer" — for log messages + self.lock = threading.Lock() self._client: ModbusTcpClient | None = None - self._connected = False - self._connect_attempts = 0 - self._last_connect_error = "" - - # ------------------------------------------------------------------ - # Connection - # ------------------------------------------------------------------ + self.connected = False + self.connect_attempts = 0 + self.last_error = "" def connect(self) -> bool: - """Open the Modbus TCP connection. Returns True on success.""" - with self._lock: - return self._connect_locked() - - def _connect_locked(self) -> bool: + """Open (or reopen) the TCP connection. Call with lock held.""" if self._client is not None: try: self._client.close() @@ -131,41 +141,93 @@ class TerminatorIO: pass self._client = ModbusTcpClient( - host=self.device.host, - port=self.device.port, + host=self._device.host, + port=self._device.port, timeout=2, retries=1, ) - self._connect_attempts += 1 + self.connect_attempts += 1 ok = self._client.connect() - self._connected = ok + self.connected = ok if ok: - log.info("Connected to %s (%s:%d)", - self.device.id, self.device.host, self.device.port) + log.info("%s %s connected to %s:%d", + self._device.id, self._role, + self._device.host, self._device.port) else: - self._last_connect_error = ( - f"TCP connect failed to {self.device.host}:{self.device.port}" + self.last_error = ( + f"TCP connect failed to {self._device.host}:{self._device.port}" ) - log.warning("Could not connect to %s: %s", - self.device.id, self._last_connect_error) + log.warning("%s %s connect failed: %s", + self._device.id, self._role, self.last_error) return ok + def close(self) -> None: + if self._client: + try: + self._client.close() + except Exception: + pass + self.connected = False + self._client = None + + @property + def client(self) -> ModbusTcpClient | None: + return self._client + + +# --------------------------------------------------------------------------- +# TerminatorIO — one instance per physical EBC100 controller +# --------------------------------------------------------------------------- + +class TerminatorIO: + """ + Modbus TCP driver for a single T1H-EBC100 controller. + + Uses two independent TCP connections: + - _reader: for poll thread reads (FC02, FC04). Lock held only during reads. + - _writer: for output writes (FC05, FC06, FC15, FC16). Lock held only during writes. + + Since each connection has its own lock, writes never block behind reads + and vice versa. + """ + + def __init__(self, device: "DeviceConfig") -> None: + self.device = device + self._reader = _ModbusConn(device, "reader") + self._writer = _ModbusConn(device, "writer") + + # ------------------------------------------------------------------ + # Connection + # ------------------------------------------------------------------ + + def connect(self) -> bool: + """Open both read and write connections. Returns True if both succeed.""" + r = self.connect_reader() + w = self.connect_writer() + return r and w + + def connect_reader(self) -> bool: + """Open the read connection (used by poll thread).""" + with self._reader.lock: + return self._reader.connect() + + def connect_writer(self) -> bool: + """Open the write connection (used by sequencer/API/TUI).""" + with self._writer.lock: + return self._writer.connect() + def disconnect(self) -> None: - with self._lock: - if self._client: - try: - self._client.close() - except Exception: - pass - self._connected = False - self._client = None + with self._reader.lock: + self._reader.close() + with self._writer.lock: + self._writer.close() @property def connected(self) -> bool: - return self._connected + return self._reader.connected or self._writer.connected # ------------------------------------------------------------------ - # Read inputs — single bulk FC02 request for all input modules + # Read inputs — single bulk FC02 request (uses read connection) # ------------------------------------------------------------------ def read_inputs(self) -> list[bool] | None: @@ -175,207 +237,201 @@ class TerminatorIO: Returns a flat list of bool ordered by slot then point (matching the unified address scheme), or None on comms error. - FC02 returns input bits starting at address 0. Because input modules - are always at lower slot numbers than output modules (enforced by the - unified address scheme), the FC02 bit index equals modbus_address for - every input signal. + Uses the read connection — never blocks write callers. """ total = self.device.total_input_points() if total == 0: return [] - with self._lock: - return self._fc02_locked(address=0, count=total) + with self._reader.lock: + return self._fc02(self._reader, address=0, count=total) - def _fc02_locked(self, address: int, count: int) -> list[bool] | None: + def _fc02(self, conn: _ModbusConn, address: int, count: int) -> list[bool] | None: for attempt in range(2): - if not self._connected: - if not self._connect_locked(): + if not conn.connected: + if not conn.connect(): return None try: - rr = self._client.read_discrete_inputs( + rr = conn.client.read_discrete_inputs( address=address, count=count, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC02 error: %s", self.device.id, rr) - self._connected = False + conn.connected = False continue return list(rr.bits[:count]) except (ModbusException, ConnectionError, OSError) as exc: - log.warning("%s read error (attempt %d): %s", + log.warning("%s FC02 read error (attempt %d): %s", self.device.id, attempt + 1, exc) - self._connected = False + conn.connected = False time.sleep(0.05) return None # ------------------------------------------------------------------ - # Read analog input registers — single bulk FC04 request + # Read analog input registers — bulk FC04 (uses read connection) # ------------------------------------------------------------------ def read_registers(self, address: int, count: int) -> list[int] | None: """ Read contiguous 16-bit input registers via FC04. - Used for analog and temperature input modules whose signals live - in the register address space. Returns a list of raw int values - (0–65535), or None on comms error. + Uses the read connection — never blocks write callers. """ if count == 0: return [] - with self._lock: - return self._fc04_locked(address, count) + with self._reader.lock: + return self._fc04(self._reader, address, count) - def _fc04_locked(self, address: int, count: int) -> list[int] | None: + def _fc04(self, conn: _ModbusConn, address: int, count: int) -> list[int] | None: for attempt in range(2): - if not self._connected: - if not self._connect_locked(): + if not conn.connected: + if not conn.connect(): return None try: - rr = self._client.read_input_registers( + rr = conn.client.read_input_registers( address=address, count=count, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC04 error: %s", self.device.id, rr) - self._connected = False + conn.connected = False continue return list(rr.registers[:count]) except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC04 read error (attempt %d): %s", self.device.id, attempt + 1, exc) - self._connected = False + conn.connected = False time.sleep(0.05) return None # ------------------------------------------------------------------ - # Write digital outputs + # Write digital outputs (uses write connection) # ------------------------------------------------------------------ def write_output(self, address: int, value: bool) -> bool: """ Write a single coil via FC05. - Address is the unified slot-order coil address (as stored in - LogicalIO.modbus_address). Returns True on success. - - Note: the EBC100 echoes True for any address — write errors for - out-of-range addresses are silent. Config validation prevents - invalid addresses at startup. + Uses the write connection — never blocked by poll thread reads. """ - with self._lock: - return self._fc05_locked(address, value) + with self._writer.lock: + return self._fc05(self._writer, address, value) - def _fc05_locked(self, address: int, value: bool) -> bool: + def _fc05(self, conn: _ModbusConn, address: int, value: bool) -> bool: for attempt in range(2): - if not self._connected: - if not self._connect_locked(): + if not conn.connected: + if not conn.connect(): return False try: - rr = self._client.write_coil( + rr = conn.client.write_coil( address=address, value=value, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC05 error addr=%d: %s", self.device.id, address, rr) - self._connected = False + conn.connected = False continue log.debug("%s coil[%d] = %s", self.device.id, address, value) return True except (ModbusException, ConnectionError, OSError) as exc: - log.warning("%s write error (attempt %d): %s", + log.warning("%s FC05 write error (attempt %d): %s", self.device.id, attempt + 1, exc) - self._connected = False + conn.connected = False time.sleep(0.05) return False def write_outputs(self, address: int, values: list[bool]) -> bool: - """Write multiple contiguous coils via FC15.""" - with self._lock: - for attempt in range(2): - if not self._connected: - if not self._connect_locked(): - return False - try: - rr = self._client.write_coils( - address=address, values=values, - device_id=self.device.unit_id, - ) - if rr.isError() or isinstance(rr, ExceptionResponse): - log.warning("%s FC15 error addr=%d: %s", - self.device.id, address, rr) - self._connected = False - continue - return True - except (ModbusException, ConnectionError, OSError) as exc: - log.warning("%s write_coils error (attempt %d): %s", - self.device.id, attempt + 1, exc) - self._connected = False - time.sleep(0.05) + """Write multiple contiguous coils via FC15. Uses write connection.""" + with self._writer.lock: + return self._fc15(self._writer, address, values) + + def _fc15(self, conn: _ModbusConn, address: int, values: list[bool]) -> bool: + for attempt in range(2): + if not conn.connected: + if not conn.connect(): + return False + try: + rr = conn.client.write_coils( + address=address, values=values, + device_id=self.device.unit_id, + ) + if rr.isError() or isinstance(rr, ExceptionResponse): + log.warning("%s FC15 error addr=%d: %s", + self.device.id, address, rr) + conn.connected = False + continue + return True + except (ModbusException, ConnectionError, OSError) as exc: + log.warning("%s FC15 write error (attempt %d): %s", + self.device.id, attempt + 1, exc) + conn.connected = False + time.sleep(0.05) return False # ------------------------------------------------------------------ - # Write analog outputs + # Write analog outputs (uses write connection) # ------------------------------------------------------------------ def write_register(self, address: int, value: int) -> bool: """ Write a single 16-bit holding register via FC06. - - Address is the register-space address (as stored in - LogicalIO.modbus_address for analog output signals). - value is a raw 16-bit integer (0–65535). + Uses the write connection — never blocked by poll thread reads. """ - with self._lock: - return self._fc06_locked(address, value) + with self._writer.lock: + return self._fc06(self._writer, address, value) - def _fc06_locked(self, address: int, value: int) -> bool: + def _fc06(self, conn: _ModbusConn, address: int, value: int) -> bool: for attempt in range(2): - if not self._connected: - if not self._connect_locked(): + if not conn.connected: + if not conn.connect(): return False try: - rr = self._client.write_register( + rr = conn.client.write_register( address=address, value=value, device_id=self.device.unit_id, ) if rr.isError() or isinstance(rr, ExceptionResponse): log.warning("%s FC06 error addr=%d: %s", self.device.id, address, rr) - self._connected = False + conn.connected = False continue log.debug("%s reg[%d] = %d", self.device.id, address, value) return True except (ModbusException, ConnectionError, OSError) as exc: log.warning("%s FC06 write error (attempt %d): %s", self.device.id, attempt + 1, exc) - self._connected = False + conn.connected = False time.sleep(0.05) return False def write_registers(self, address: int, values: list[int]) -> bool: - """Write multiple contiguous 16-bit holding registers via FC16.""" - with self._lock: - for attempt in range(2): - if not self._connected: - if not self._connect_locked(): - return False - try: - rr = self._client.write_registers( - address=address, values=values, - device_id=self.device.unit_id, - ) - if rr.isError() or isinstance(rr, ExceptionResponse): - log.warning("%s FC16 error addr=%d: %s", - self.device.id, address, rr) - self._connected = False - continue - return True - except (ModbusException, ConnectionError, OSError) as exc: - log.warning("%s FC16 write error (attempt %d): %s", - self.device.id, attempt + 1, exc) - self._connected = False - time.sleep(0.05) + """Write multiple contiguous 16-bit holding registers via FC16. + Uses write connection.""" + with self._writer.lock: + return self._fc16(self._writer, address, values) + + def _fc16(self, conn: _ModbusConn, address: int, values: list[int]) -> bool: + for attempt in range(2): + if not conn.connected: + if not conn.connect(): + return False + try: + rr = conn.client.write_registers( + address=address, values=values, + device_id=self.device.unit_id, + ) + if rr.isError() or isinstance(rr, ExceptionResponse): + log.warning("%s FC16 error addr=%d: %s", + self.device.id, address, rr) + conn.connected = False + continue + return True + except (ModbusException, ConnectionError, OSError) as exc: + log.warning("%s FC16 write error (attempt %d): %s", + self.device.id, attempt + 1, exc) + conn.connected = False + time.sleep(0.05) return False # ------------------------------------------------------------------ @@ -387,9 +443,13 @@ class TerminatorIO: "device_id": self.device.id, "host": self.device.host, "port": self.device.port, - "connected": self._connected, - "connect_attempts": self._connect_attempts, - "last_error": self._last_connect_error or None, + "connected": self.connected, + "reader_connected": self._reader.connected, + "writer_connected": self._writer.connected, + "reader_connect_attempts": self._reader.connect_attempts, + "writer_connect_attempts": self._writer.connect_attempts, + "last_reader_error": self._reader.last_error or None, + "last_writer_error": self._writer.last_error or None, } @@ -402,16 +462,19 @@ class _PollThread(threading.Thread): Reads all input points from one EBC100 at poll_interval_ms, updates the shared signal cache. Daemon thread — exits when the process does. - Each poll cycle reads BOTH address spaces: - - FC02 (coil space): digital input signals → list[bool] - - FC04 (register space): analog/temperature input signals → list[int] + Each poll cycle reads BOTH address spaces via the driver's read connection: + - FC02 (coil space): digital input signals -> list[bool] + - FC04 (register space): analog/temperature input signals -> list[int] + + The read connection has its own lock, so poll reads never block output + writes (which use the separate write connection). """ def __init__( self, driver: TerminatorIO, - digital_signals: list["LogicalIO"], # digital input signals, sorted by modbus_address - analog_signals: list["LogicalIO"], # analog/temp input signals, sorted by modbus_address + digital_signals: list["LogicalIO"], + analog_signals: list["LogicalIO"], cache: dict[str, SignalState], lock: threading.Lock, ) -> None: @@ -443,7 +506,8 @@ class _PollThread(threading.Thread): len(self._digital_signals), len(self._analog_signals)) - self._driver.connect() + # Only connect the read client; the write client connects on first use + self._driver.connect_reader() rate_t0 = time.monotonic() rate_polls = 0 @@ -481,7 +545,7 @@ class _PollThread(threading.Thread): updates: dict[str, SignalState] = {} now = time.monotonic() - # ── Digital inputs (FC02, coil space) ───────────────────────── + # -- Digital inputs (FC02, coil space) ------------------------- if self._digital_signals: bits = self._driver.read_inputs() if bits is None: @@ -508,7 +572,7 @@ class _PollThread(threading.Thread): self._driver.device.id, sig.name, sig.modbus_address, len(bits)) - # ── Analog / temperature inputs (FC04, register space) ──────── + # -- Analog / temperature inputs (FC04, register space) -------- if self._analog_signals: total_regs = self._driver.device.total_analog_input_channels() regs = self._driver.read_registers(address=0, count=total_regs) @@ -557,7 +621,7 @@ class _PollThread(threading.Thread): # --------------------------------------------------------------------------- -# IORegistry — multi-device coordinator (replaces PollManager + driver dict) +# IORegistry — multi-device coordinator # --------------------------------------------------------------------------- class IORegistry: @@ -611,7 +675,7 @@ class IORegistry: # ------------------------------------------------------------------ def start(self) -> None: - """Start all poll threads (each connects its own driver on first cycle).""" + """Start all poll threads (each connects its read client on first cycle).""" for p in self._pollers: p.start()