Files
SequencerIO/arnold/config.py
2026-03-02 17:48:55 -05:00

538 lines
19 KiB
Python

"""
arnold/config.py — YAML config loader, dataclasses, and validation.
Schema:
devices: list of EBC100 controllers
logical_io: named signals mapped to device/slot/point
sequences: ordered step lists with timing and actions
Address spaces (EBC100 hardware):
The T1H-EBC100 maintains TWO independent Modbus address spaces:
coil space (1-bit) — digital modules (inputs + outputs + relays)
FC01 read coils, FC02 read discrete inputs,
FC05 write single coil, FC15 write multiple coils
Flat sequential addressing by physical slot order.
register space (16-bit) — analog + temperature modules
FC03 read holding registers, FC04 read input registers,
FC06 write single register, FC16 write multiple registers
Flat sequential addressing by physical slot order,
INDEPENDENT of coil space.
A digital module in slot 1 advances coil_offset but not register_offset.
An analog module in slot 2 advances register_offset but not coil_offset.
"""
from __future__ import annotations
import yaml
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
from .module_types import MODULE_REGISTRY, ModuleType, Category, get_module_type
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class ModuleConfig:
slot: int # 1-based physical slot number
type: str # part number, e.g. "T1H-08TDS"
module_type: ModuleType # resolved from registry
points: int # number of I/O points/channels
category: Category # from ModuleType
modbus_space: Literal["coil", "register"] # from ModuleType
# Modbus base address (0-based) within the appropriate space,
# computed at load time.
modbus_address: int = 0
@dataclass
class DeviceConfig:
id: str
host: str
port: int
unit_id: int
poll_interval_ms: int
modules: list[ModuleConfig] = field(default_factory=list)
def module_for_slot(self, slot: int) -> ModuleConfig | None:
return next((m for m in self.modules if m.slot == slot), None)
# -- Digital helpers ---------------------------------------------------
def digital_input_modules(self) -> list[ModuleConfig]:
return [m for m in self.modules
if m.category == "digital_input"]
def digital_output_modules(self) -> list[ModuleConfig]:
return [m for m in self.modules
if m.category in ("digital_output", "relay_output")]
def total_input_points(self) -> int:
"""Total discrete input bits (for FC02 bulk read)."""
return sum(m.points for m in self.digital_input_modules())
def total_output_points(self) -> int:
"""Total discrete output bits."""
return sum(m.points for m in self.digital_output_modules())
# -- Analog helpers ----------------------------------------------------
def analog_input_modules(self) -> list[ModuleConfig]:
return [m for m in self.modules
if m.category in ("analog_input", "temperature_input")]
def analog_output_modules(self) -> list[ModuleConfig]:
return [m for m in self.modules
if m.category == "analog_output"]
def total_analog_input_channels(self) -> int:
"""Total 16-bit input registers (for FC04 bulk read)."""
return sum(m.points for m in self.analog_input_modules())
def total_analog_output_channels(self) -> int:
"""Total 16-bit output registers."""
return sum(m.points for m in self.analog_output_modules())
# -- Generic helpers ---------------------------------------------------
def input_modules(self) -> list[ModuleConfig]:
"""All modules that are inputs (digital + analog + temperature)."""
return [m for m in self.modules
if m.module_type.direction == "input"]
def output_modules(self) -> list[ModuleConfig]:
"""All modules that are outputs (digital + relay + analog)."""
return [m for m in self.modules
if m.module_type.direction == "output"]
@dataclass
class LogicalIO:
name: str
device: str # device id
slot: int # 1-based
point: int # 1-based within slot
direction: Literal["input", "output"]
category: Category # full category from ModuleType
modbus_space: Literal["coil", "register"] # which address space
value_type: Literal["bool", "int"] # bool for digital, int for analog
# For digital outputs: optional startup default
default_state: bool | None = None
# For analog outputs: optional startup default (raw register value)
default_value: int | None = None
# Resolved at load time:
modbus_address: int = 0 # 0-based within the appropriate space
device_ref: DeviceConfig | None = None
module_ref: ModuleConfig | None = None
@dataclass
class SequenceStep:
t_ms: int # absolute ms from sequence T=0
action: Literal["set_output", "check_input", "wait_input"]
signal: str # logical_io name
# For digital set_output:
state: bool | None = None
# For analog set_output (raw register value):
value: int | None = None
# For check_input / wait_input (digital):
expected: bool | None = None
# For check_input / wait_input (analog — raw register value):
expected_value: int | None = None
tolerance: int | None = None # analog: abs(actual - expected) <= tolerance
# For wait_input only:
timeout_ms: int | None = None
@dataclass
class Sequence:
name: str
description: str
steps: list[SequenceStep] = field(default_factory=list)
@dataclass
class Config:
devices: list[DeviceConfig]
logical_io: list[LogicalIO]
sequences: list[Sequence]
def device(self, device_id: str) -> DeviceConfig | None:
return next((d for d in self.devices if d.id == device_id), None)
def signal(self, name: str) -> LogicalIO | None:
return next((s for s in self.logical_io if s.name == name), None)
def sequence(self, name: str) -> Sequence | None:
return next((s for s in self.sequences if s.name == name), None)
# ---------------------------------------------------------------------------
# Loader
# ---------------------------------------------------------------------------
class ConfigError(Exception):
pass
def _parse_bool(val: object, context: str) -> bool:
if isinstance(val, bool):
return val
if isinstance(val, str):
if val.lower() in ("true", "yes", "on", "1"):
return True
if val.lower() in ("false", "no", "off", "0"):
return False
raise ConfigError(f"{context}: expected boolean, got {val!r}")
def load(path: str | Path) -> Config:
"""Load and validate a YAML config file. Raises ConfigError on any problem."""
path = Path(path)
if not path.exists():
raise ConfigError(f"Config file not found: {path}")
with open(path) as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ConfigError("Config file must be a YAML mapping at the top level")
devices = _parse_devices(raw.get("devices") or [])
device_map = {d.id: d for d in devices}
logical_io = _parse_logical_io(raw.get("logical_io") or [], device_map)
signal_map = {s.name: s for s in logical_io}
sequences = _parse_sequences(raw.get("sequences") or [], signal_map)
return Config(devices=devices, logical_io=logical_io, sequences=sequences)
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_devices(raw_list: list) -> list[DeviceConfig]:
if not isinstance(raw_list, list):
raise ConfigError("'devices' must be a list")
devices: list[DeviceConfig] = []
seen_ids: set[str] = set()
for i, raw in enumerate(raw_list):
ctx = f"devices[{i}]"
if not isinstance(raw, dict):
raise ConfigError(f"{ctx}: must be a mapping")
dev_id = _require_str(raw, "id", ctx)
if dev_id in seen_ids:
raise ConfigError(f"{ctx}: duplicate device id {dev_id!r}")
seen_ids.add(dev_id)
host = _require_str(raw, "host", ctx)
port = int(raw.get("port", 502))
unit_id = int(raw.get("unit_id", 1))
poll_interval = int(raw.get("poll_interval_ms", 50))
modules = _parse_modules(raw.get("modules") or [], ctx)
# Assign Modbus addresses using TWO independent counters.
#
# The T1H-EBC100 has separate address spaces for coils (digital)
# and registers (analog/temperature). Each space is flat and
# sequential by physical slot order. A digital module advances
# only the coil counter; an analog module advances only the
# register counter.
coil_offset = 0
register_offset = 0
for m in sorted(modules, key=lambda m: m.slot):
if m.modbus_space == "coil":
m.modbus_address = coil_offset
coil_offset += m.points
else:
m.modbus_address = register_offset
register_offset += m.points
devices.append(DeviceConfig(
id=dev_id,
host=host,
port=port,
unit_id=unit_id,
poll_interval_ms=poll_interval,
modules=sorted(modules, key=lambda m: m.slot),
))
return devices
def _parse_modules(raw_list: list, parent_ctx: str) -> list[ModuleConfig]:
if not isinstance(raw_list, list):
raise ConfigError(f"{parent_ctx}.modules: must be a list")
modules: list[ModuleConfig] = []
seen_slots: set[int] = set()
for i, raw in enumerate(raw_list):
ctx = f"{parent_ctx}.modules[{i}]"
if not isinstance(raw, dict):
raise ConfigError(f"{ctx}: must be a mapping")
slot = int(_require(raw, "slot", ctx))
mod_type = _require_str(raw, "type", ctx)
if slot < 1:
raise ConfigError(f"{ctx}: slot must be >= 1, got {slot}")
if slot in seen_slots:
raise ConfigError(f"{ctx}: duplicate slot {slot}")
seen_slots.add(slot)
try:
mt = get_module_type(mod_type)
except KeyError as exc:
raise ConfigError(f"{ctx}: {exc}") from None
points = int(raw.get("points", mt.points))
modules.append(ModuleConfig(
slot=slot,
type=mod_type,
module_type=mt,
points=points,
category=mt.category,
modbus_space=mt.modbus_space,
))
return modules
def _parse_logical_io(
raw_list: list, device_map: dict[str, DeviceConfig]
) -> list[LogicalIO]:
if not isinstance(raw_list, list):
raise ConfigError("'logical_io' must be a list")
signals: list[LogicalIO] = []
seen_names: set[str] = set()
for i, raw in enumerate(raw_list):
ctx = f"logical_io[{i}]"
if not isinstance(raw, dict):
raise ConfigError(f"{ctx}: must be a mapping")
name = _require_str(raw, "name", ctx)
dev_id = _require_str(raw, "device", ctx)
slot = int(_require(raw, "slot", ctx))
point = int(_require(raw, "point", ctx))
direction = _require_str(raw, "direction", ctx)
if name in seen_names:
raise ConfigError(f"{ctx}: duplicate signal name {name!r}")
seen_names.add(name)
if direction not in ("input", "output"):
raise ConfigError(
f"{ctx}: direction must be 'input' or 'output', got {direction!r}"
)
dev = device_map.get(dev_id)
if dev is None:
raise ConfigError(f"{ctx}: references unknown device {dev_id!r}")
mod = dev.module_for_slot(slot)
if mod is None:
slots = [m.slot for m in dev.modules]
raise ConfigError(
f"{ctx}: device {dev_id!r} has no module at slot {slot}. "
f"Available slots: {slots}"
)
if mod.module_type.direction != direction:
raise ConfigError(
f"{ctx}: signal direction {direction!r} does not match "
f"module type {mod.type!r} which is {mod.module_type.direction!r}"
)
if point < 1 or point > mod.points:
raise ConfigError(
f"{ctx}: point {point} out of range for module at slot {slot} "
f"(module has {mod.points} points, 1-based)"
)
# Parse default_state (digital outputs only)
default_state: bool | None = None
if "default_state" in raw:
if not mod.module_type.is_digital or direction != "output":
raise ConfigError(
f"{ctx}: default_state is only valid for digital output signals"
)
default_state = _parse_bool(raw["default_state"], ctx + ".default_state")
# Parse default_value (analog outputs only)
default_value: int | None = None
if "default_value" in raw:
if not mod.module_type.is_analog or direction != "output":
raise ConfigError(
f"{ctx}: default_value is only valid for analog output signals"
)
default_value = int(raw["default_value"])
# Modbus address = module's base address + (point - 1)
modbus_address = mod.modbus_address + (point - 1)
signals.append(LogicalIO(
name=name,
device=dev_id,
slot=slot,
point=point,
direction=direction,
category=mod.category,
modbus_space=mod.modbus_space,
value_type=mod.module_type.value_type,
default_state=default_state,
default_value=default_value,
modbus_address=modbus_address,
device_ref=dev,
module_ref=mod,
))
return signals
def _parse_sequences(
raw_list: list, signal_map: dict[str, LogicalIO]
) -> list[Sequence]:
if not isinstance(raw_list, list):
raise ConfigError("'sequences' must be a list")
sequences: list[Sequence] = []
seen_names: set[str] = set()
for i, raw in enumerate(raw_list):
ctx = f"sequences[{i}]"
if not isinstance(raw, dict):
raise ConfigError(f"{ctx}: must be a mapping")
name = _require_str(raw, "name", ctx)
description = raw.get("description", "")
if name in seen_names:
raise ConfigError(f"{ctx}: duplicate sequence name {name!r}")
seen_names.add(name)
steps = _parse_steps(raw.get("steps") or [], signal_map, ctx)
sequences.append(Sequence(name=name, description=description, steps=steps))
return sequences
def _parse_steps(
raw_list: list, signal_map: dict[str, LogicalIO], parent_ctx: str
) -> list[SequenceStep]:
if not isinstance(raw_list, list):
raise ConfigError(f"{parent_ctx}.steps: must be a list")
steps: list[SequenceStep] = []
for i, raw in enumerate(raw_list):
ctx = f"{parent_ctx}.steps[{i}]"
if not isinstance(raw, dict):
raise ConfigError(f"{ctx}: must be a mapping")
t_ms = int(_require(raw, "t_ms", ctx))
action = _require_str(raw, "action", ctx)
signal = _require_str(raw, "signal", ctx)
if t_ms < 0:
raise ConfigError(f"{ctx}: t_ms must be >= 0, got {t_ms}")
if action not in ("set_output", "check_input", "wait_input"):
raise ConfigError(
f"{ctx}: action must be 'set_output', 'check_input', or 'wait_input',"
f" got {action!r}"
)
sig = signal_map.get(signal)
if sig is None:
raise ConfigError(f"{ctx}: references unknown signal {signal!r}")
step = SequenceStep(t_ms=t_ms, action=action, signal=signal)
if action == "set_output":
if sig.direction != "output":
raise ConfigError(
f"{ctx}: set_output action used on signal {signal!r} "
f"which is an input — use check_input instead"
)
if sig.value_type == "bool":
if "state" not in raw:
raise ConfigError(
f"{ctx}: set_output step for digital signal requires 'state'"
)
step.state = _parse_bool(raw["state"], ctx + ".state")
else:
if "value" not in raw:
raise ConfigError(
f"{ctx}: set_output step for analog signal requires 'value'"
)
step.value = int(raw["value"])
elif action in ("check_input", "wait_input"):
if sig.value_type == "bool":
if "expected" not in raw:
raise ConfigError(
f"{ctx}: {action} step for digital signal requires 'expected'"
)
step.expected = _parse_bool(raw["expected"], ctx + ".expected")
else:
if "expected_value" not in raw:
raise ConfigError(
f"{ctx}: {action} step for analog signal requires "
f"'expected_value'"
)
step.expected_value = int(raw["expected_value"])
if "tolerance" in raw:
step.tolerance = int(raw["tolerance"])
else:
step.tolerance = 0
if action == "wait_input":
if "timeout_ms" not in raw:
raise ConfigError(
f"{ctx}: wait_input step requires 'timeout_ms' field"
)
step.timeout_ms = int(raw["timeout_ms"])
if step.timeout_ms <= 0:
raise ConfigError(
f"{ctx}: timeout_ms must be > 0, got {step.timeout_ms}"
)
steps.append(step)
# Sort by t_ms so steps need not be in order in the YAML
steps.sort(key=lambda s: s.t_ms)
return steps
# ---------------------------------------------------------------------------
# Tiny helpers
# ---------------------------------------------------------------------------
def _require(d: dict, key: str, ctx: str) -> object:
if key not in d:
raise ConfigError(f"{ctx}: missing required field '{key}'")
return d[key]
def _require_str(d: dict, key: str, ctx: str) -> str:
val = _require(d, key, ctx)
if not isinstance(val, str):
raise ConfigError(f"{ctx}.{key}: expected string, got {type(val).__name__}")
return val