538 lines
19 KiB
Python
538 lines
19 KiB
Python
|
|
"""
|
||
|
|
arnold/config.py — YAML config loader, dataclasses, and validation.
|
||
|
|
|
||
|
|
Schema:
|
||
|
|
devices: list of EBC100 controllers
|
||
|
|
logical_io: named signals mapped to device/slot/point
|
||
|
|
sequences: ordered step lists with timing and actions
|
||
|
|
|
||
|
|
Address spaces (EBC100 hardware):
|
||
|
|
The T1H-EBC100 maintains TWO independent Modbus address spaces:
|
||
|
|
|
||
|
|
coil space (1-bit) — digital modules (inputs + outputs + relays)
|
||
|
|
FC01 read coils, FC02 read discrete inputs,
|
||
|
|
FC05 write single coil, FC15 write multiple coils
|
||
|
|
Flat sequential addressing by physical slot order.
|
||
|
|
|
||
|
|
register space (16-bit) — analog + temperature modules
|
||
|
|
FC03 read holding registers, FC04 read input registers,
|
||
|
|
FC06 write single register, FC16 write multiple registers
|
||
|
|
Flat sequential addressing by physical slot order,
|
||
|
|
INDEPENDENT of coil space.
|
||
|
|
|
||
|
|
A digital module in slot 1 advances coil_offset but not register_offset.
|
||
|
|
An analog module in slot 2 advances register_offset but not coil_offset.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import yaml
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Literal
|
||
|
|
|
||
|
|
from .module_types import MODULE_REGISTRY, ModuleType, Category, get_module_type
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Dataclasses
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class ModuleConfig:
|
||
|
|
slot: int # 1-based physical slot number
|
||
|
|
type: str # part number, e.g. "T1H-08TDS"
|
||
|
|
module_type: ModuleType # resolved from registry
|
||
|
|
points: int # number of I/O points/channels
|
||
|
|
category: Category # from ModuleType
|
||
|
|
modbus_space: Literal["coil", "register"] # from ModuleType
|
||
|
|
# Modbus base address (0-based) within the appropriate space,
|
||
|
|
# computed at load time.
|
||
|
|
modbus_address: int = 0
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class DeviceConfig:
|
||
|
|
id: str
|
||
|
|
host: str
|
||
|
|
port: int
|
||
|
|
unit_id: int
|
||
|
|
poll_interval_ms: int
|
||
|
|
modules: list[ModuleConfig] = field(default_factory=list)
|
||
|
|
|
||
|
|
def module_for_slot(self, slot: int) -> ModuleConfig | None:
|
||
|
|
return next((m for m in self.modules if m.slot == slot), None)
|
||
|
|
|
||
|
|
# -- Digital helpers ---------------------------------------------------
|
||
|
|
|
||
|
|
def digital_input_modules(self) -> list[ModuleConfig]:
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.category == "digital_input"]
|
||
|
|
|
||
|
|
def digital_output_modules(self) -> list[ModuleConfig]:
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.category in ("digital_output", "relay_output")]
|
||
|
|
|
||
|
|
def total_input_points(self) -> int:
|
||
|
|
"""Total discrete input bits (for FC02 bulk read)."""
|
||
|
|
return sum(m.points for m in self.digital_input_modules())
|
||
|
|
|
||
|
|
def total_output_points(self) -> int:
|
||
|
|
"""Total discrete output bits."""
|
||
|
|
return sum(m.points for m in self.digital_output_modules())
|
||
|
|
|
||
|
|
# -- Analog helpers ----------------------------------------------------
|
||
|
|
|
||
|
|
def analog_input_modules(self) -> list[ModuleConfig]:
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.category in ("analog_input", "temperature_input")]
|
||
|
|
|
||
|
|
def analog_output_modules(self) -> list[ModuleConfig]:
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.category == "analog_output"]
|
||
|
|
|
||
|
|
def total_analog_input_channels(self) -> int:
|
||
|
|
"""Total 16-bit input registers (for FC04 bulk read)."""
|
||
|
|
return sum(m.points for m in self.analog_input_modules())
|
||
|
|
|
||
|
|
def total_analog_output_channels(self) -> int:
|
||
|
|
"""Total 16-bit output registers."""
|
||
|
|
return sum(m.points for m in self.analog_output_modules())
|
||
|
|
|
||
|
|
# -- Generic helpers ---------------------------------------------------
|
||
|
|
|
||
|
|
def input_modules(self) -> list[ModuleConfig]:
|
||
|
|
"""All modules that are inputs (digital + analog + temperature)."""
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.module_type.direction == "input"]
|
||
|
|
|
||
|
|
def output_modules(self) -> list[ModuleConfig]:
|
||
|
|
"""All modules that are outputs (digital + relay + analog)."""
|
||
|
|
return [m for m in self.modules
|
||
|
|
if m.module_type.direction == "output"]
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class LogicalIO:
|
||
|
|
name: str
|
||
|
|
device: str # device id
|
||
|
|
slot: int # 1-based
|
||
|
|
point: int # 1-based within slot
|
||
|
|
direction: Literal["input", "output"]
|
||
|
|
category: Category # full category from ModuleType
|
||
|
|
modbus_space: Literal["coil", "register"] # which address space
|
||
|
|
value_type: Literal["bool", "int"] # bool for digital, int for analog
|
||
|
|
# For digital outputs: optional startup default
|
||
|
|
default_state: bool | None = None
|
||
|
|
# For analog outputs: optional startup default (raw register value)
|
||
|
|
default_value: int | None = None
|
||
|
|
# Resolved at load time:
|
||
|
|
modbus_address: int = 0 # 0-based within the appropriate space
|
||
|
|
device_ref: DeviceConfig | None = None
|
||
|
|
module_ref: ModuleConfig | None = None
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SequenceStep:
|
||
|
|
t_ms: int # absolute ms from sequence T=0
|
||
|
|
action: Literal["set_output", "check_input", "wait_input"]
|
||
|
|
signal: str # logical_io name
|
||
|
|
# For digital set_output:
|
||
|
|
state: bool | None = None
|
||
|
|
# For analog set_output (raw register value):
|
||
|
|
value: int | None = None
|
||
|
|
# For check_input / wait_input (digital):
|
||
|
|
expected: bool | None = None
|
||
|
|
# For check_input / wait_input (analog — raw register value):
|
||
|
|
expected_value: int | None = None
|
||
|
|
tolerance: int | None = None # analog: abs(actual - expected) <= tolerance
|
||
|
|
# For wait_input only:
|
||
|
|
timeout_ms: int | None = None
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Sequence:
|
||
|
|
name: str
|
||
|
|
description: str
|
||
|
|
steps: list[SequenceStep] = field(default_factory=list)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Config:
|
||
|
|
devices: list[DeviceConfig]
|
||
|
|
logical_io: list[LogicalIO]
|
||
|
|
sequences: list[Sequence]
|
||
|
|
|
||
|
|
def device(self, device_id: str) -> DeviceConfig | None:
|
||
|
|
return next((d for d in self.devices if d.id == device_id), None)
|
||
|
|
|
||
|
|
def signal(self, name: str) -> LogicalIO | None:
|
||
|
|
return next((s for s in self.logical_io if s.name == name), None)
|
||
|
|
|
||
|
|
def sequence(self, name: str) -> Sequence | None:
|
||
|
|
return next((s for s in self.sequences if s.name == name), None)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Loader
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
class ConfigError(Exception):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_bool(val: object, context: str) -> bool:
|
||
|
|
if isinstance(val, bool):
|
||
|
|
return val
|
||
|
|
if isinstance(val, str):
|
||
|
|
if val.lower() in ("true", "yes", "on", "1"):
|
||
|
|
return True
|
||
|
|
if val.lower() in ("false", "no", "off", "0"):
|
||
|
|
return False
|
||
|
|
raise ConfigError(f"{context}: expected boolean, got {val!r}")
|
||
|
|
|
||
|
|
|
||
|
|
def load(path: str | Path) -> Config:
|
||
|
|
"""Load and validate a YAML config file. Raises ConfigError on any problem."""
|
||
|
|
path = Path(path)
|
||
|
|
if not path.exists():
|
||
|
|
raise ConfigError(f"Config file not found: {path}")
|
||
|
|
|
||
|
|
with open(path) as f:
|
||
|
|
raw = yaml.safe_load(f)
|
||
|
|
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError("Config file must be a YAML mapping at the top level")
|
||
|
|
|
||
|
|
devices = _parse_devices(raw.get("devices") or [])
|
||
|
|
device_map = {d.id: d for d in devices}
|
||
|
|
|
||
|
|
logical_io = _parse_logical_io(raw.get("logical_io") or [], device_map)
|
||
|
|
signal_map = {s.name: s for s in logical_io}
|
||
|
|
|
||
|
|
sequences = _parse_sequences(raw.get("sequences") or [], signal_map)
|
||
|
|
|
||
|
|
return Config(devices=devices, logical_io=logical_io, sequences=sequences)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Parsing helpers
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _parse_devices(raw_list: list) -> list[DeviceConfig]:
|
||
|
|
if not isinstance(raw_list, list):
|
||
|
|
raise ConfigError("'devices' must be a list")
|
||
|
|
|
||
|
|
devices: list[DeviceConfig] = []
|
||
|
|
seen_ids: set[str] = set()
|
||
|
|
|
||
|
|
for i, raw in enumerate(raw_list):
|
||
|
|
ctx = f"devices[{i}]"
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError(f"{ctx}: must be a mapping")
|
||
|
|
|
||
|
|
dev_id = _require_str(raw, "id", ctx)
|
||
|
|
if dev_id in seen_ids:
|
||
|
|
raise ConfigError(f"{ctx}: duplicate device id {dev_id!r}")
|
||
|
|
seen_ids.add(dev_id)
|
||
|
|
|
||
|
|
host = _require_str(raw, "host", ctx)
|
||
|
|
port = int(raw.get("port", 502))
|
||
|
|
unit_id = int(raw.get("unit_id", 1))
|
||
|
|
poll_interval = int(raw.get("poll_interval_ms", 50))
|
||
|
|
|
||
|
|
modules = _parse_modules(raw.get("modules") or [], ctx)
|
||
|
|
|
||
|
|
# Assign Modbus addresses using TWO independent counters.
|
||
|
|
#
|
||
|
|
# The T1H-EBC100 has separate address spaces for coils (digital)
|
||
|
|
# and registers (analog/temperature). Each space is flat and
|
||
|
|
# sequential by physical slot order. A digital module advances
|
||
|
|
# only the coil counter; an analog module advances only the
|
||
|
|
# register counter.
|
||
|
|
coil_offset = 0
|
||
|
|
register_offset = 0
|
||
|
|
for m in sorted(modules, key=lambda m: m.slot):
|
||
|
|
if m.modbus_space == "coil":
|
||
|
|
m.modbus_address = coil_offset
|
||
|
|
coil_offset += m.points
|
||
|
|
else:
|
||
|
|
m.modbus_address = register_offset
|
||
|
|
register_offset += m.points
|
||
|
|
|
||
|
|
devices.append(DeviceConfig(
|
||
|
|
id=dev_id,
|
||
|
|
host=host,
|
||
|
|
port=port,
|
||
|
|
unit_id=unit_id,
|
||
|
|
poll_interval_ms=poll_interval,
|
||
|
|
modules=sorted(modules, key=lambda m: m.slot),
|
||
|
|
))
|
||
|
|
|
||
|
|
return devices
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_modules(raw_list: list, parent_ctx: str) -> list[ModuleConfig]:
|
||
|
|
if not isinstance(raw_list, list):
|
||
|
|
raise ConfigError(f"{parent_ctx}.modules: must be a list")
|
||
|
|
|
||
|
|
modules: list[ModuleConfig] = []
|
||
|
|
seen_slots: set[int] = set()
|
||
|
|
|
||
|
|
for i, raw in enumerate(raw_list):
|
||
|
|
ctx = f"{parent_ctx}.modules[{i}]"
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError(f"{ctx}: must be a mapping")
|
||
|
|
|
||
|
|
slot = int(_require(raw, "slot", ctx))
|
||
|
|
mod_type = _require_str(raw, "type", ctx)
|
||
|
|
|
||
|
|
if slot < 1:
|
||
|
|
raise ConfigError(f"{ctx}: slot must be >= 1, got {slot}")
|
||
|
|
if slot in seen_slots:
|
||
|
|
raise ConfigError(f"{ctx}: duplicate slot {slot}")
|
||
|
|
seen_slots.add(slot)
|
||
|
|
|
||
|
|
try:
|
||
|
|
mt = get_module_type(mod_type)
|
||
|
|
except KeyError as exc:
|
||
|
|
raise ConfigError(f"{ctx}: {exc}") from None
|
||
|
|
|
||
|
|
points = int(raw.get("points", mt.points))
|
||
|
|
|
||
|
|
modules.append(ModuleConfig(
|
||
|
|
slot=slot,
|
||
|
|
type=mod_type,
|
||
|
|
module_type=mt,
|
||
|
|
points=points,
|
||
|
|
category=mt.category,
|
||
|
|
modbus_space=mt.modbus_space,
|
||
|
|
))
|
||
|
|
|
||
|
|
return modules
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_logical_io(
|
||
|
|
raw_list: list, device_map: dict[str, DeviceConfig]
|
||
|
|
) -> list[LogicalIO]:
|
||
|
|
if not isinstance(raw_list, list):
|
||
|
|
raise ConfigError("'logical_io' must be a list")
|
||
|
|
|
||
|
|
signals: list[LogicalIO] = []
|
||
|
|
seen_names: set[str] = set()
|
||
|
|
|
||
|
|
for i, raw in enumerate(raw_list):
|
||
|
|
ctx = f"logical_io[{i}]"
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError(f"{ctx}: must be a mapping")
|
||
|
|
|
||
|
|
name = _require_str(raw, "name", ctx)
|
||
|
|
dev_id = _require_str(raw, "device", ctx)
|
||
|
|
slot = int(_require(raw, "slot", ctx))
|
||
|
|
point = int(_require(raw, "point", ctx))
|
||
|
|
direction = _require_str(raw, "direction", ctx)
|
||
|
|
|
||
|
|
if name in seen_names:
|
||
|
|
raise ConfigError(f"{ctx}: duplicate signal name {name!r}")
|
||
|
|
seen_names.add(name)
|
||
|
|
|
||
|
|
if direction not in ("input", "output"):
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: direction must be 'input' or 'output', got {direction!r}"
|
||
|
|
)
|
||
|
|
|
||
|
|
dev = device_map.get(dev_id)
|
||
|
|
if dev is None:
|
||
|
|
raise ConfigError(f"{ctx}: references unknown device {dev_id!r}")
|
||
|
|
|
||
|
|
mod = dev.module_for_slot(slot)
|
||
|
|
if mod is None:
|
||
|
|
slots = [m.slot for m in dev.modules]
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: device {dev_id!r} has no module at slot {slot}. "
|
||
|
|
f"Available slots: {slots}"
|
||
|
|
)
|
||
|
|
|
||
|
|
if mod.module_type.direction != direction:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: signal direction {direction!r} does not match "
|
||
|
|
f"module type {mod.type!r} which is {mod.module_type.direction!r}"
|
||
|
|
)
|
||
|
|
|
||
|
|
if point < 1 or point > mod.points:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: point {point} out of range for module at slot {slot} "
|
||
|
|
f"(module has {mod.points} points, 1-based)"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Parse default_state (digital outputs only)
|
||
|
|
default_state: bool | None = None
|
||
|
|
if "default_state" in raw:
|
||
|
|
if not mod.module_type.is_digital or direction != "output":
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: default_state is only valid for digital output signals"
|
||
|
|
)
|
||
|
|
default_state = _parse_bool(raw["default_state"], ctx + ".default_state")
|
||
|
|
|
||
|
|
# Parse default_value (analog outputs only)
|
||
|
|
default_value: int | None = None
|
||
|
|
if "default_value" in raw:
|
||
|
|
if not mod.module_type.is_analog or direction != "output":
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: default_value is only valid for analog output signals"
|
||
|
|
)
|
||
|
|
default_value = int(raw["default_value"])
|
||
|
|
|
||
|
|
# Modbus address = module's base address + (point - 1)
|
||
|
|
modbus_address = mod.modbus_address + (point - 1)
|
||
|
|
|
||
|
|
signals.append(LogicalIO(
|
||
|
|
name=name,
|
||
|
|
device=dev_id,
|
||
|
|
slot=slot,
|
||
|
|
point=point,
|
||
|
|
direction=direction,
|
||
|
|
category=mod.category,
|
||
|
|
modbus_space=mod.modbus_space,
|
||
|
|
value_type=mod.module_type.value_type,
|
||
|
|
default_state=default_state,
|
||
|
|
default_value=default_value,
|
||
|
|
modbus_address=modbus_address,
|
||
|
|
device_ref=dev,
|
||
|
|
module_ref=mod,
|
||
|
|
))
|
||
|
|
|
||
|
|
return signals
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_sequences(
|
||
|
|
raw_list: list, signal_map: dict[str, LogicalIO]
|
||
|
|
) -> list[Sequence]:
|
||
|
|
if not isinstance(raw_list, list):
|
||
|
|
raise ConfigError("'sequences' must be a list")
|
||
|
|
|
||
|
|
sequences: list[Sequence] = []
|
||
|
|
seen_names: set[str] = set()
|
||
|
|
|
||
|
|
for i, raw in enumerate(raw_list):
|
||
|
|
ctx = f"sequences[{i}]"
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError(f"{ctx}: must be a mapping")
|
||
|
|
|
||
|
|
name = _require_str(raw, "name", ctx)
|
||
|
|
description = raw.get("description", "")
|
||
|
|
|
||
|
|
if name in seen_names:
|
||
|
|
raise ConfigError(f"{ctx}: duplicate sequence name {name!r}")
|
||
|
|
seen_names.add(name)
|
||
|
|
|
||
|
|
steps = _parse_steps(raw.get("steps") or [], signal_map, ctx)
|
||
|
|
sequences.append(Sequence(name=name, description=description, steps=steps))
|
||
|
|
|
||
|
|
return sequences
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_steps(
|
||
|
|
raw_list: list, signal_map: dict[str, LogicalIO], parent_ctx: str
|
||
|
|
) -> list[SequenceStep]:
|
||
|
|
if not isinstance(raw_list, list):
|
||
|
|
raise ConfigError(f"{parent_ctx}.steps: must be a list")
|
||
|
|
|
||
|
|
steps: list[SequenceStep] = []
|
||
|
|
|
||
|
|
for i, raw in enumerate(raw_list):
|
||
|
|
ctx = f"{parent_ctx}.steps[{i}]"
|
||
|
|
if not isinstance(raw, dict):
|
||
|
|
raise ConfigError(f"{ctx}: must be a mapping")
|
||
|
|
|
||
|
|
t_ms = int(_require(raw, "t_ms", ctx))
|
||
|
|
action = _require_str(raw, "action", ctx)
|
||
|
|
signal = _require_str(raw, "signal", ctx)
|
||
|
|
|
||
|
|
if t_ms < 0:
|
||
|
|
raise ConfigError(f"{ctx}: t_ms must be >= 0, got {t_ms}")
|
||
|
|
|
||
|
|
if action not in ("set_output", "check_input", "wait_input"):
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: action must be 'set_output', 'check_input', or 'wait_input',"
|
||
|
|
f" got {action!r}"
|
||
|
|
)
|
||
|
|
|
||
|
|
sig = signal_map.get(signal)
|
||
|
|
if sig is None:
|
||
|
|
raise ConfigError(f"{ctx}: references unknown signal {signal!r}")
|
||
|
|
|
||
|
|
step = SequenceStep(t_ms=t_ms, action=action, signal=signal)
|
||
|
|
|
||
|
|
if action == "set_output":
|
||
|
|
if sig.direction != "output":
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: set_output action used on signal {signal!r} "
|
||
|
|
f"which is an input — use check_input instead"
|
||
|
|
)
|
||
|
|
if sig.value_type == "bool":
|
||
|
|
if "state" not in raw:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: set_output step for digital signal requires 'state'"
|
||
|
|
)
|
||
|
|
step.state = _parse_bool(raw["state"], ctx + ".state")
|
||
|
|
else:
|
||
|
|
if "value" not in raw:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: set_output step for analog signal requires 'value'"
|
||
|
|
)
|
||
|
|
step.value = int(raw["value"])
|
||
|
|
|
||
|
|
elif action in ("check_input", "wait_input"):
|
||
|
|
if sig.value_type == "bool":
|
||
|
|
if "expected" not in raw:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: {action} step for digital signal requires 'expected'"
|
||
|
|
)
|
||
|
|
step.expected = _parse_bool(raw["expected"], ctx + ".expected")
|
||
|
|
else:
|
||
|
|
if "expected_value" not in raw:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: {action} step for analog signal requires "
|
||
|
|
f"'expected_value'"
|
||
|
|
)
|
||
|
|
step.expected_value = int(raw["expected_value"])
|
||
|
|
if "tolerance" in raw:
|
||
|
|
step.tolerance = int(raw["tolerance"])
|
||
|
|
else:
|
||
|
|
step.tolerance = 0
|
||
|
|
|
||
|
|
if action == "wait_input":
|
||
|
|
if "timeout_ms" not in raw:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: wait_input step requires 'timeout_ms' field"
|
||
|
|
)
|
||
|
|
step.timeout_ms = int(raw["timeout_ms"])
|
||
|
|
if step.timeout_ms <= 0:
|
||
|
|
raise ConfigError(
|
||
|
|
f"{ctx}: timeout_ms must be > 0, got {step.timeout_ms}"
|
||
|
|
)
|
||
|
|
|
||
|
|
steps.append(step)
|
||
|
|
|
||
|
|
# Sort by t_ms so steps need not be in order in the YAML
|
||
|
|
steps.sort(key=lambda s: s.t_ms)
|
||
|
|
return steps
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Tiny helpers
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _require(d: dict, key: str, ctx: str) -> object:
|
||
|
|
if key not in d:
|
||
|
|
raise ConfigError(f"{ctx}: missing required field '{key}'")
|
||
|
|
return d[key]
|
||
|
|
|
||
|
|
|
||
|
|
def _require_str(d: dict, key: str, ctx: str) -> str:
|
||
|
|
val = _require(d, key, ctx)
|
||
|
|
if not isinstance(val, str):
|
||
|
|
raise ConfigError(f"{ctx}.{key}: expected string, got {type(val).__name__}")
|
||
|
|
return val
|