""" Protocol Information Data Models This module defines data structures for representing protocol information, decoded fields, and protocol registry management. """ from dataclasses import dataclass, field from typing import Dict, List, Set, Optional, Any, Union from enum import Enum, IntEnum from abc import ABC, abstractmethod class ProtocolType(IntEnum): """Protocol type identifiers""" UNKNOWN = 0 # Standard protocols UDP = 10 TCP = 11 ICMP = 12 IGMP = 13 # Enhanced protocols CHAPTER10 = 100 CH10 = 100 # Alias for CHAPTER10 PTP = 101 IENA = 102 NTP = 103 class ProtocolCategory(Enum): """Protocol categories for organization""" TRANSPORT = "transport" # UDP, TCP, ICMP NETWORK = "network" # IP, IGMP ENHANCED = "enhanced" # CH10, PTP, IENA TIMING = "timing" # PTP, NTP TELEMETRY = "telemetry" # CH10, IENA class FieldType(Enum): """Types of decoded fields""" INTEGER = "integer" FLOAT = "float" STRING = "string" BOOLEAN = "boolean" TIMESTAMP = "timestamp" IP_ADDRESS = "ip_address" MAC_ADDRESS = "mac_address" BINARY = "binary" ENUM = "enum" @dataclass class DecodedField: """Represents a single decoded field from a protocol""" name: str value: Any field_type: FieldType description: Optional[str] = None unit: Optional[str] = None # e.g., "ms", "bytes", "ppm" confidence: float = 1.0 # 0.0 to 1.0 is_critical: bool = False # Critical field for protocol operation def __str__(self) -> str: unit_str = f" {self.unit}" if self.unit else "" return f"{self.name}: {self.value}{unit_str}" def format_value(self) -> str: """Format the value for display""" if self.field_type == FieldType.TIMESTAMP: import datetime if isinstance(self.value, (int, float)): dt = datetime.datetime.fromtimestamp(self.value) return dt.strftime("%H:%M:%S.%f")[:-3] elif self.field_type == FieldType.FLOAT: return f"{self.value:.3f}" elif self.field_type == FieldType.IP_ADDRESS: return str(self.value) elif self.field_type == FieldType.BINARY: if isinstance(self.value, bytes): return self.value.hex()[:16] + "..." if len(self.value) > 8 else self.value.hex() return str(self.value) @dataclass class ProtocolInfo: """Information about a detected protocol""" protocol_type: ProtocolType name: str category: ProtocolCategory version: Optional[str] = None confidence: float = 1.0 # Detection confidence 0.0 to 1.0 # Protocol-specific metadata port: Optional[int] = None subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" vendor: Optional[str] = None def __str__(self) -> str: version_str = f" v{self.version}" if self.version else "" subtype_str = f"-{self.subtype}" if self.subtype else "" return f"{self.name}{subtype_str}{version_str}" @property def is_enhanced(self) -> bool: """Check if this is an enhanced protocol requiring special handling""" return self.category in [ProtocolCategory.ENHANCED, ProtocolCategory.TIMING, ProtocolCategory.TELEMETRY] class StandardProtocol: """Standard protocol definitions""" UDP = ProtocolInfo( protocol_type=ProtocolType.UDP, name="UDP", category=ProtocolCategory.TRANSPORT ) TCP = ProtocolInfo( protocol_type=ProtocolType.TCP, name="TCP", category=ProtocolCategory.TRANSPORT ) ICMP = ProtocolInfo( protocol_type=ProtocolType.ICMP, name="ICMP", category=ProtocolCategory.NETWORK ) IGMP = ProtocolInfo( protocol_type=ProtocolType.IGMP, name="IGMP", category=ProtocolCategory.NETWORK ) class EnhancedProtocol: """Enhanced protocol definitions""" CHAPTER10 = ProtocolInfo( protocol_type=ProtocolType.CHAPTER10, name="Chapter 10", category=ProtocolCategory.TELEMETRY ) PTP = ProtocolInfo( protocol_type=ProtocolType.PTP, name="PTP", category=ProtocolCategory.TIMING ) IENA = ProtocolInfo( protocol_type=ProtocolType.IENA, name="IENA", category=ProtocolCategory.TELEMETRY ) NTP = ProtocolInfo( protocol_type=ProtocolType.NTP, name="NTP", category=ProtocolCategory.TIMING ) @dataclass class ProtocolDecodeResult: """Result of protocol decoding""" protocol_info: ProtocolInfo fields: List[DecodedField] = field(default_factory=list) frame_type: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" payload_size: int = 0 errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) def get_field(self, name: str) -> Optional[DecodedField]: """Get a specific field by name""" for field in self.fields: if field.name == name: return field return None def get_critical_fields(self) -> List[DecodedField]: """Get all critical fields""" return [f for f in self.fields if f.is_critical] def has_errors(self) -> bool: """Check if decode result has any errors""" return len(self.errors) > 0 class ProtocolRegistry: """Registry for managing protocol information and detection""" def __init__(self): self._protocols: Dict[ProtocolType, ProtocolInfo] = {} self._register_standard_protocols() self._register_enhanced_protocols() def _register_standard_protocols(self): """Register standard protocols""" for attr_name in dir(StandardProtocol): if not attr_name.startswith('_'): protocol = getattr(StandardProtocol, attr_name) if isinstance(protocol, ProtocolInfo): self._protocols[protocol.protocol_type] = protocol def _register_enhanced_protocols(self): """Register enhanced protocols""" for attr_name in dir(EnhancedProtocol): if not attr_name.startswith('_'): protocol = getattr(EnhancedProtocol, attr_name) if isinstance(protocol, ProtocolInfo): self._protocols[protocol.protocol_type] = protocol def get_protocol(self, protocol_type: ProtocolType) -> Optional[ProtocolInfo]: """Get protocol info by type""" return self._protocols.get(protocol_type) def get_protocol_by_name(self, name: str) -> Optional[ProtocolInfo]: """Get protocol info by name""" for protocol in self._protocols.values(): if protocol.name.lower() == name.lower(): return protocol return None def get_enhanced_protocols(self) -> List[ProtocolInfo]: """Get all enhanced protocols""" return [p for p in self._protocols.values() if p.is_enhanced] def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]: """Get all protocols in a category""" return [p for p in self._protocols.values() if p.category == category] def register_protocol(self, protocol_info: ProtocolInfo): """Register a new protocol""" self._protocols[protocol_info.protocol_type] = protocol_info def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool: """Check if protocol type is enhanced""" protocol = self.get_protocol(protocol_type) return protocol.is_enhanced if protocol else False # Global protocol registry instance PROTOCOL_REGISTRY = ProtocolRegistry() def get_protocol_info(protocol_type: ProtocolType) -> Optional[ProtocolInfo]: """Convenience function to get protocol info""" return PROTOCOL_REGISTRY.get_protocol(protocol_type) def is_enhanced_protocol(protocol_type: ProtocolType) -> bool: """Convenience function to check if protocol is enhanced""" return PROTOCOL_REGISTRY.is_enhanced_protocol(protocol_type)