""" arnold/config.py — YAML config loader, dataclasses, and validation. Schema: devices: list of EBC100 controllers logical_io: named signals mapped to device/slot/point sequences: ordered step lists with timing and actions Address spaces (EBC100 hardware): The T1H-EBC100 maintains TWO independent Modbus address spaces: coil space (1-bit) — digital modules (inputs + outputs + relays) FC01 read coils, FC02 read discrete inputs, FC05 write single coil, FC15 write multiple coils Flat sequential addressing by physical slot order. register space (16-bit) — analog + temperature modules FC03 read holding registers, FC04 read input registers, FC06 write single register, FC16 write multiple registers Flat sequential addressing by physical slot order, INDEPENDENT of coil space. A digital module in slot 1 advances coil_offset but not register_offset. An analog module in slot 2 advances register_offset but not coil_offset. """ from __future__ import annotations import yaml from dataclasses import dataclass, field from pathlib import Path from typing import Literal from .module_types import MODULE_REGISTRY, ModuleType, Category, get_module_type # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class ModuleConfig: slot: int # 1-based physical slot number type: str # part number, e.g. "T1H-08TDS" module_type: ModuleType # resolved from registry points: int # number of I/O points/channels category: Category # from ModuleType modbus_space: Literal["coil", "register"] # from ModuleType # Modbus base address (0-based) within the appropriate space, # computed at load time. modbus_address: int = 0 @dataclass class DeviceConfig: id: str host: str port: int unit_id: int poll_interval_ms: int modules: list[ModuleConfig] = field(default_factory=list) def module_for_slot(self, slot: int) -> ModuleConfig | None: return next((m for m in self.modules if m.slot == slot), None) # -- Digital helpers --------------------------------------------------- def digital_input_modules(self) -> list[ModuleConfig]: return [m for m in self.modules if m.category == "digital_input"] def digital_output_modules(self) -> list[ModuleConfig]: return [m for m in self.modules if m.category in ("digital_output", "relay_output")] def total_input_points(self) -> int: """Total discrete input bits (for FC02 bulk read).""" return sum(m.points for m in self.digital_input_modules()) def total_output_points(self) -> int: """Total discrete output bits.""" return sum(m.points for m in self.digital_output_modules()) # -- Analog helpers ---------------------------------------------------- def analog_input_modules(self) -> list[ModuleConfig]: return [m for m in self.modules if m.category in ("analog_input", "temperature_input")] def analog_output_modules(self) -> list[ModuleConfig]: return [m for m in self.modules if m.category == "analog_output"] def total_analog_input_channels(self) -> int: """Total 16-bit input registers (for FC04 bulk read).""" return sum(m.points for m in self.analog_input_modules()) def total_analog_output_channels(self) -> int: """Total 16-bit output registers.""" return sum(m.points for m in self.analog_output_modules()) # -- Generic helpers --------------------------------------------------- def input_modules(self) -> list[ModuleConfig]: """All modules that are inputs (digital + analog + temperature).""" return [m for m in self.modules if m.module_type.direction == "input"] def output_modules(self) -> list[ModuleConfig]: """All modules that are outputs (digital + relay + analog).""" return [m for m in self.modules if m.module_type.direction == "output"] @dataclass class LogicalIO: name: str device: str # device id slot: int # 1-based point: int # 1-based within slot direction: Literal["input", "output"] category: Category # full category from ModuleType modbus_space: Literal["coil", "register"] # which address space value_type: Literal["bool", "int"] # bool for digital, int for analog # For digital outputs: optional startup default default_state: bool | None = None # For analog outputs: optional startup default (raw register value) default_value: int | None = None # Resolved at load time: modbus_address: int = 0 # 0-based within the appropriate space device_ref: DeviceConfig | None = None module_ref: ModuleConfig | None = None @dataclass class SequenceStep: t_ms: int # absolute ms from sequence T=0 action: Literal["set_output", "check_input", "wait_input"] signal: str # logical_io name # For digital set_output: state: bool | None = None # For analog set_output (raw register value): value: int | None = None # For check_input / wait_input (digital): expected: bool | None = None # For check_input / wait_input (analog — raw register value): expected_value: int | None = None tolerance: int | None = None # analog: abs(actual - expected) <= tolerance # For wait_input only: timeout_ms: int | None = None @dataclass class Sequence: name: str description: str steps: list[SequenceStep] = field(default_factory=list) @dataclass class Config: devices: list[DeviceConfig] logical_io: list[LogicalIO] sequences: list[Sequence] def device(self, device_id: str) -> DeviceConfig | None: return next((d for d in self.devices if d.id == device_id), None) def signal(self, name: str) -> LogicalIO | None: return next((s for s in self.logical_io if s.name == name), None) def sequence(self, name: str) -> Sequence | None: return next((s for s in self.sequences if s.name == name), None) # --------------------------------------------------------------------------- # Loader # --------------------------------------------------------------------------- class ConfigError(Exception): pass def _parse_bool(val: object, context: str) -> bool: if isinstance(val, bool): return val if isinstance(val, str): if val.lower() in ("true", "yes", "on", "1"): return True if val.lower() in ("false", "no", "off", "0"): return False raise ConfigError(f"{context}: expected boolean, got {val!r}") def load(path: str | Path) -> Config: """Load and validate a YAML config file. Raises ConfigError on any problem.""" path = Path(path) if not path.exists(): raise ConfigError(f"Config file not found: {path}") with open(path) as f: raw = yaml.safe_load(f) if not isinstance(raw, dict): raise ConfigError("Config file must be a YAML mapping at the top level") devices = _parse_devices(raw.get("devices") or []) device_map = {d.id: d for d in devices} logical_io = _parse_logical_io(raw.get("logical_io") or [], device_map) signal_map = {s.name: s for s in logical_io} sequences = _parse_sequences(raw.get("sequences") or [], signal_map) return Config(devices=devices, logical_io=logical_io, sequences=sequences) # --------------------------------------------------------------------------- # Parsing helpers # --------------------------------------------------------------------------- def _parse_devices(raw_list: list) -> list[DeviceConfig]: if not isinstance(raw_list, list): raise ConfigError("'devices' must be a list") devices: list[DeviceConfig] = [] seen_ids: set[str] = set() for i, raw in enumerate(raw_list): ctx = f"devices[{i}]" if not isinstance(raw, dict): raise ConfigError(f"{ctx}: must be a mapping") dev_id = _require_str(raw, "id", ctx) if dev_id in seen_ids: raise ConfigError(f"{ctx}: duplicate device id {dev_id!r}") seen_ids.add(dev_id) host = _require_str(raw, "host", ctx) port = int(raw.get("port", 502)) unit_id = int(raw.get("unit_id", 1)) poll_interval = int(raw.get("poll_interval_ms", 50)) modules = _parse_modules(raw.get("modules") or [], ctx) # Assign Modbus addresses using TWO independent counters. # # The T1H-EBC100 has separate address spaces for coils (digital) # and registers (analog/temperature). Each space is flat and # sequential by physical slot order. A digital module advances # only the coil counter; an analog module advances only the # register counter. coil_offset = 0 register_offset = 0 for m in sorted(modules, key=lambda m: m.slot): if m.modbus_space == "coil": m.modbus_address = coil_offset coil_offset += m.points else: m.modbus_address = register_offset register_offset += m.points devices.append(DeviceConfig( id=dev_id, host=host, port=port, unit_id=unit_id, poll_interval_ms=poll_interval, modules=sorted(modules, key=lambda m: m.slot), )) return devices def _parse_modules(raw_list: list, parent_ctx: str) -> list[ModuleConfig]: if not isinstance(raw_list, list): raise ConfigError(f"{parent_ctx}.modules: must be a list") modules: list[ModuleConfig] = [] seen_slots: set[int] = set() for i, raw in enumerate(raw_list): ctx = f"{parent_ctx}.modules[{i}]" if not isinstance(raw, dict): raise ConfigError(f"{ctx}: must be a mapping") slot = int(_require(raw, "slot", ctx)) mod_type = _require_str(raw, "type", ctx) if slot < 1: raise ConfigError(f"{ctx}: slot must be >= 1, got {slot}") if slot in seen_slots: raise ConfigError(f"{ctx}: duplicate slot {slot}") seen_slots.add(slot) try: mt = get_module_type(mod_type) except KeyError as exc: raise ConfigError(f"{ctx}: {exc}") from None points = int(raw.get("points", mt.points)) modules.append(ModuleConfig( slot=slot, type=mod_type, module_type=mt, points=points, category=mt.category, modbus_space=mt.modbus_space, )) return modules def _parse_logical_io( raw_list: list, device_map: dict[str, DeviceConfig] ) -> list[LogicalIO]: if not isinstance(raw_list, list): raise ConfigError("'logical_io' must be a list") signals: list[LogicalIO] = [] seen_names: set[str] = set() for i, raw in enumerate(raw_list): ctx = f"logical_io[{i}]" if not isinstance(raw, dict): raise ConfigError(f"{ctx}: must be a mapping") name = _require_str(raw, "name", ctx) dev_id = _require_str(raw, "device", ctx) slot = int(_require(raw, "slot", ctx)) point = int(_require(raw, "point", ctx)) direction = _require_str(raw, "direction", ctx) if name in seen_names: raise ConfigError(f"{ctx}: duplicate signal name {name!r}") seen_names.add(name) if direction not in ("input", "output"): raise ConfigError( f"{ctx}: direction must be 'input' or 'output', got {direction!r}" ) dev = device_map.get(dev_id) if dev is None: raise ConfigError(f"{ctx}: references unknown device {dev_id!r}") mod = dev.module_for_slot(slot) if mod is None: slots = [m.slot for m in dev.modules] raise ConfigError( f"{ctx}: device {dev_id!r} has no module at slot {slot}. " f"Available slots: {slots}" ) if mod.module_type.direction != direction: raise ConfigError( f"{ctx}: signal direction {direction!r} does not match " f"module type {mod.type!r} which is {mod.module_type.direction!r}" ) if point < 1 or point > mod.points: raise ConfigError( f"{ctx}: point {point} out of range for module at slot {slot} " f"(module has {mod.points} points, 1-based)" ) # Parse default_state (digital outputs only) default_state: bool | None = None if "default_state" in raw: if not mod.module_type.is_digital or direction != "output": raise ConfigError( f"{ctx}: default_state is only valid for digital output signals" ) default_state = _parse_bool(raw["default_state"], ctx + ".default_state") # Parse default_value (analog outputs only) default_value: int | None = None if "default_value" in raw: if not mod.module_type.is_analog or direction != "output": raise ConfigError( f"{ctx}: default_value is only valid for analog output signals" ) default_value = int(raw["default_value"]) # Modbus address = module's base address + (point - 1) modbus_address = mod.modbus_address + (point - 1) signals.append(LogicalIO( name=name, device=dev_id, slot=slot, point=point, direction=direction, category=mod.category, modbus_space=mod.modbus_space, value_type=mod.module_type.value_type, default_state=default_state, default_value=default_value, modbus_address=modbus_address, device_ref=dev, module_ref=mod, )) return signals def _parse_sequences( raw_list: list, signal_map: dict[str, LogicalIO] ) -> list[Sequence]: if not isinstance(raw_list, list): raise ConfigError("'sequences' must be a list") sequences: list[Sequence] = [] seen_names: set[str] = set() for i, raw in enumerate(raw_list): ctx = f"sequences[{i}]" if not isinstance(raw, dict): raise ConfigError(f"{ctx}: must be a mapping") name = _require_str(raw, "name", ctx) description = raw.get("description", "") if name in seen_names: raise ConfigError(f"{ctx}: duplicate sequence name {name!r}") seen_names.add(name) steps = _parse_steps(raw.get("steps") or [], signal_map, ctx) sequences.append(Sequence(name=name, description=description, steps=steps)) return sequences def _parse_steps( raw_list: list, signal_map: dict[str, LogicalIO], parent_ctx: str ) -> list[SequenceStep]: if not isinstance(raw_list, list): raise ConfigError(f"{parent_ctx}.steps: must be a list") steps: list[SequenceStep] = [] for i, raw in enumerate(raw_list): ctx = f"{parent_ctx}.steps[{i}]" if not isinstance(raw, dict): raise ConfigError(f"{ctx}: must be a mapping") t_ms = int(_require(raw, "t_ms", ctx)) action = _require_str(raw, "action", ctx) signal = _require_str(raw, "signal", ctx) if t_ms < 0: raise ConfigError(f"{ctx}: t_ms must be >= 0, got {t_ms}") if action not in ("set_output", "check_input", "wait_input"): raise ConfigError( f"{ctx}: action must be 'set_output', 'check_input', or 'wait_input'," f" got {action!r}" ) sig = signal_map.get(signal) if sig is None: raise ConfigError(f"{ctx}: references unknown signal {signal!r}") step = SequenceStep(t_ms=t_ms, action=action, signal=signal) if action == "set_output": if sig.direction != "output": raise ConfigError( f"{ctx}: set_output action used on signal {signal!r} " f"which is an input — use check_input instead" ) if sig.value_type == "bool": if "state" not in raw: raise ConfigError( f"{ctx}: set_output step for digital signal requires 'state'" ) step.state = _parse_bool(raw["state"], ctx + ".state") else: if "value" not in raw: raise ConfigError( f"{ctx}: set_output step for analog signal requires 'value'" ) step.value = int(raw["value"]) elif action in ("check_input", "wait_input"): if sig.value_type == "bool": if "expected" not in raw: raise ConfigError( f"{ctx}: {action} step for digital signal requires 'expected'" ) step.expected = _parse_bool(raw["expected"], ctx + ".expected") else: if "expected_value" not in raw: raise ConfigError( f"{ctx}: {action} step for analog signal requires " f"'expected_value'" ) step.expected_value = int(raw["expected_value"]) if "tolerance" in raw: step.tolerance = int(raw["tolerance"]) else: step.tolerance = 0 if action == "wait_input": if "timeout_ms" not in raw: raise ConfigError( f"{ctx}: wait_input step requires 'timeout_ms' field" ) step.timeout_ms = int(raw["timeout_ms"]) if step.timeout_ms <= 0: raise ConfigError( f"{ctx}: timeout_ms must be > 0, got {step.timeout_ms}" ) steps.append(step) # Sort by t_ms so steps need not be in order in the YAML steps.sort(key=lambda s: s.t_ms) return steps # --------------------------------------------------------------------------- # Tiny helpers # --------------------------------------------------------------------------- def _require(d: dict, key: str, ctx: str) -> object: if key not in d: raise ConfigError(f"{ctx}: missing required field '{key}'") return d[key] def _require_str(d: dict, key: str, ctx: str) -> str: val = _require(d, key, ctx) if not isinstance(val, str): raise ConfigError(f"{ctx}.{key}: expected string, got {type(val).__name__}") return val