Files
SequencerIO/probe_terminator.py
2026-03-02 17:48:55 -05:00

282 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
probe_terminator.py — AutomationDirect Terminator I/O Modbus TCP prober
Target: T1H-EBC100 Ethernet controller with T1H-08TDS digital input modules
Protocol: Modbus TCP, FC02 (Read Discrete Inputs), port 502
Usage:
python3 probe_terminator.py [--host HOST] [--port PORT] [--unit UNIT]
[--watch] [--interval SEC] [--max-modules N]
--host IP address of the Terminator I/O controller (default: 192.168.3.202)
--port Modbus TCP port (default: 502)
--unit Modbus unit/slave ID (default: 1; 0 = auto-discover)
--watch Continuously poll and display live state changes
--interval Poll interval in seconds when using --watch (default: 0.5)
--max-modules Maximum modules to probe during discovery (default: 3)
Note: The T1H-EBC100 returns zeros for unmapped addresses rather than a Modbus
exception, so module count cannot be auto-detected from protocol errors alone.
Use --max-modules to match the physically installed module count.
"""
import argparse
import sys
import time
import os
try:
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
except ImportError:
print("ERROR: pymodbus is not installed.")
print(" Install with: pip3 install pymodbus --break-system-packages")
sys.exit(1)
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
POINTS_PER_MODULE = 8 # T1H-08TDS has 8 input points
MAX_UNIT_ID_SCAN = 10 # how many unit IDs to try during auto-discovery
MODBUS_DI_START = 0 # pymodbus uses 0-based addressing; maps to 10001
MODULE_NAMES = {8: "T1H-08TDS (8-pt DC input)"}
# ANSI colours (disabled automatically if not a TTY)
_COLOUR = sys.stdout.isatty()
C_RESET = "\033[0m" if _COLOUR else ""
C_GREEN = "\033[32m" if _COLOUR else ""
C_RED = "\033[31m" if _COLOUR else ""
C_YELLOW = "\033[33m" if _COLOUR else ""
C_CYAN = "\033[36m" if _COLOUR else ""
C_BOLD = "\033[1m" if _COLOUR else ""
C_DIM = "\033[2m" if _COLOUR else ""
# ─────────────────────────────────────────────────────────────────────────────
# Helper: single FC02 read, returns list[bool] or None on error
# ─────────────────────────────────────────────────────────────────────────────
def read_discrete_inputs(client, address, count, unit):
"""Read `count` discrete inputs starting at 0-based `address`. Returns list[bool] or None."""
try:
# pymodbus >= 3.x uses device_id=; older versions use slave=
try:
rr = client.read_discrete_inputs(address=address, count=count, device_id=unit)
except TypeError:
rr = client.read_discrete_inputs(address=address, count=count, slave=unit)
if rr.isError() or isinstance(rr, ExceptionResponse):
return None
return rr.bits[:count]
except ModbusException:
return None
except Exception:
return None
# ─────────────────────────────────────────────────────────────────────────────
# Step 1: Discover unit ID
# ─────────────────────────────────────────────────────────────────────────────
def discover_unit_id(client):
"""
Try unit IDs 1..MAX_UNIT_ID_SCAN and return the first one that responds to
a FC02 read. The T1H-EBC100 usually responds to any unit ID over Modbus TCP
but we still honour the configured ID.
Returns the first responding unit ID, or 1 as a fallback.
"""
print(f"{C_CYAN}Scanning unit IDs 1{MAX_UNIT_ID_SCAN}...{C_RESET}")
for uid in range(1, MAX_UNIT_ID_SCAN + 1):
result = read_discrete_inputs(client, MODBUS_DI_START, 1, uid)
if result is not None:
print(f" Unit ID {C_BOLD}{uid}{C_RESET} responded.")
return uid
else:
print(f" Unit ID {uid} — no response")
print(f"{C_YELLOW}No unit ID responded; defaulting to 1{C_RESET}")
return 1
# ─────────────────────────────────────────────────────────────────────────────
# Step 2: Discover how many 8-point modules are present
# ─────────────────────────────────────────────────────────────────────────────
def discover_modules(client, unit, max_modules):
"""
Read blocks of POINTS_PER_MODULE discrete inputs for each slot up to max_modules.
The T1H-EBC100 returns zeros for unmapped slots rather than Modbus exceptions,
so we rely on max_modules to define the installed hardware count.
Returns the number of responsive slots (capped at max_modules).
"""
print(f"\n{C_CYAN}Probing {max_modules} slot(s) × {POINTS_PER_MODULE}-pt...{C_RESET}")
print(f" {C_DIM}(T1H-EBC100 returns 0 for empty slots — set --max-modules to match physical count){C_RESET}")
num_modules = 0
for slot in range(max_modules):
addr = slot * POINTS_PER_MODULE
result = read_discrete_inputs(client, addr, POINTS_PER_MODULE, unit)
if result is None:
print(f" Slot {slot + 1}: no response — check connection")
break
print(f" Slot {slot + 1}: OK (Modbus DI {addr}{addr + POINTS_PER_MODULE - 1})")
num_modules += 1
return num_modules
# ─────────────────────────────────────────────────────────────────────────────
# Step 3: Read all discovered inputs and format output
# ─────────────────────────────────────────────────────────────────────────────
def read_all_inputs(client, unit, num_modules):
"""Read all inputs for discovered modules. Returns list[bool] or None on error."""
total = num_modules * POINTS_PER_MODULE
return read_discrete_inputs(client, MODBUS_DI_START, total, unit)
def format_inputs(bits, num_modules, prev_bits=None):
"""
Format input states as a module-by-module table.
Highlights changed bits if prev_bits is provided.
Returns a list of strings (lines).
"""
lines = []
lines.append(f"{C_BOLD}{'Module':<10} {'Points':^72}{C_RESET}")
lines.append(f"{'':10} " + " ".join(f"{'pt'+str(i+1):^5}" for i in range(POINTS_PER_MODULE)))
lines.append("" * 80)
for m in range(num_modules):
pts = []
for p in range(POINTS_PER_MODULE):
idx = m * POINTS_PER_MODULE + p
val = bits[idx]
changed = (prev_bits is not None) and (val != prev_bits[idx])
if val:
label = f"{C_GREEN}{'ON':^5}{C_RESET}"
else:
label = f"{C_DIM}{'off':^5}{C_RESET}"
if changed:
label = f"{C_YELLOW}{'*':1}{C_RESET}" + label
else:
label = " " + label
pts.append(label)
addr_start = m * POINTS_PER_MODULE + 1 # 1-based Modbus reference
addr_end = addr_start + POINTS_PER_MODULE - 1
mod_label = f"Mod {m+1:>2} ({addr_start:05d}{addr_end:05d})"
lines.append(f"{mod_label:<30} " + " ".join(pts))
active = sum(1 for b in bits if b)
lines.append("" * 80)
lines.append(f" {active} of {len(bits)} inputs active")
return lines
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Probe AutomationDirect Terminator I/O via Modbus TCP"
)
parser.add_argument("--host", default="192.168.3.202",
help="Controller IP address (default: 192.168.3.202)")
parser.add_argument("--port", type=int, default=502,
help="Modbus TCP port (default: 502)")
parser.add_argument("--unit", type=int, default=0,
help="Modbus unit/slave ID (default: 0 = auto-discover)")
parser.add_argument("--watch", action="store_true",
help="Continuously poll and display live state changes")
parser.add_argument("--interval", type=float, default=0.5,
help="Poll interval in seconds for --watch (default: 0.5)")
parser.add_argument("--max-modules", type=int, default=3,
help="Number of modules installed (default: 3 for 3x T1H-08TDS)")
args = parser.parse_args()
print(f"\n{C_BOLD}=== Terminator I/O Modbus TCP Prober ==={C_RESET}")
print(f" Host : {args.host}:{args.port}")
print(f" Unit : {'auto-discover' if args.unit == 0 else args.unit}")
print()
# ── Connect ──────────────────────────────────────────────────────────────
client = ModbusTcpClient(host=args.host, port=args.port, timeout=2)
if not client.connect():
print(f"{C_RED}ERROR: Could not connect to {args.host}:{args.port}{C_RESET}")
sys.exit(1)
print(f"{C_GREEN}Connected to {args.host}:{args.port}{C_RESET}")
# ── Discover unit ID ─────────────────────────────────────────────────────
unit = args.unit if args.unit != 0 else discover_unit_id(client)
# ── Discover modules ──────────────────────────────────────────────────────
num_modules = discover_modules(client, unit, args.max_modules)
if num_modules == 0:
print(f"{C_RED}ERROR: No modules found. Check wiring and power.{C_RESET}")
client.close()
sys.exit(1)
total_pts = num_modules * POINTS_PER_MODULE
print(f"\n{C_GREEN}Found {num_modules} module(s), {total_pts} input points total.{C_RESET}")
print(f" Module type : {MODULE_NAMES.get(POINTS_PER_MODULE, f'{POINTS_PER_MODULE}-pt module')}")
print(f" Modbus address : 10001 {10000 + total_pts} (FC02)")
print(f" Unit ID : {unit}")
# ── Single-shot read ──────────────────────────────────────────────────────
print()
bits = read_all_inputs(client, unit, num_modules)
if bits is None:
print(f"{C_RED}ERROR: Failed to read inputs.{C_RESET}")
client.close()
sys.exit(1)
for line in format_inputs(bits, num_modules):
print(line)
if not args.watch:
client.close()
print(f"\n{C_DIM}Tip: run with --watch to monitor live state changes{C_RESET}")
return
# ── Watch mode ────────────────────────────────────────────────────────────
print(f"\n{C_CYAN}Watch mode: polling every {args.interval}s — press Ctrl+C to stop{C_RESET}\n")
prev_bits = bits
poll_count = 0
try:
while True:
time.sleep(args.interval)
new_bits = read_all_inputs(client, unit, num_modules)
if new_bits is None:
print(f"{C_RED}[{time.strftime('%H:%M:%S')}] Read error — retrying...{C_RESET}")
# attempt reconnect
client.close()
time.sleep(1)
client.connect()
continue
poll_count += 1
changed = any(a != b for a, b in zip(new_bits, prev_bits))
if changed or poll_count == 1:
# Clear screen and redraw
if _COLOUR:
print("\033[H\033[J", end="") # clear terminal
print(f"{C_BOLD}=== Terminator I/O Live Monitor ==={C_RESET} "
f"{C_DIM}[{time.strftime('%H:%M:%S')}] poll #{poll_count}{C_RESET}")
print(f" {args.host}:{args.port} unit={unit} "
f"interval={args.interval}s Ctrl+C to stop\n")
for line in format_inputs(new_bits, num_modules, prev_bits if changed else None):
print(line)
if changed:
print(f"\n {C_YELLOW}* = changed since last poll{C_RESET}")
prev_bits = new_bits
except KeyboardInterrupt:
print(f"\n{C_DIM}Stopped.{C_RESET}")
finally:
client.close()
if __name__ == "__main__":
main()