258 lines
8.1 KiB
Python
258 lines
8.1 KiB
Python
"""
|
|
Protocol Information Data Models
|
|
|
|
This module defines data structures for representing protocol information,
|
|
decoded fields, and protocol registry management.
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Set, Optional, Any, Union
|
|
from enum import Enum, IntEnum
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
|
class ProtocolType(IntEnum):
|
|
"""Protocol type identifiers"""
|
|
UNKNOWN = 0
|
|
|
|
# Standard protocols
|
|
UDP = 10
|
|
TCP = 11
|
|
ICMP = 12
|
|
IGMP = 13
|
|
|
|
# Enhanced protocols
|
|
CHAPTER10 = 100
|
|
CH10 = 100 # Alias for CHAPTER10
|
|
PTP = 101
|
|
IENA = 102
|
|
NTP = 103
|
|
|
|
|
|
class ProtocolCategory(Enum):
|
|
"""Protocol categories for organization"""
|
|
TRANSPORT = "transport" # UDP, TCP, ICMP
|
|
NETWORK = "network" # IP, IGMP
|
|
ENHANCED = "enhanced" # CH10, PTP, IENA
|
|
TIMING = "timing" # PTP, NTP
|
|
TELEMETRY = "telemetry" # CH10, IENA
|
|
|
|
|
|
class FieldType(Enum):
|
|
"""Types of decoded fields"""
|
|
INTEGER = "integer"
|
|
FLOAT = "float"
|
|
STRING = "string"
|
|
BOOLEAN = "boolean"
|
|
TIMESTAMP = "timestamp"
|
|
IP_ADDRESS = "ip_address"
|
|
MAC_ADDRESS = "mac_address"
|
|
BINARY = "binary"
|
|
ENUM = "enum"
|
|
|
|
|
|
@dataclass
|
|
class DecodedField:
|
|
"""Represents a single decoded field from a protocol"""
|
|
name: str
|
|
value: Any
|
|
field_type: FieldType
|
|
description: Optional[str] = None
|
|
unit: Optional[str] = None # e.g., "ms", "bytes", "ppm"
|
|
confidence: float = 1.0 # 0.0 to 1.0
|
|
is_critical: bool = False # Critical field for protocol operation
|
|
|
|
def __str__(self) -> str:
|
|
unit_str = f" {self.unit}" if self.unit else ""
|
|
return f"{self.name}: {self.value}{unit_str}"
|
|
|
|
def format_value(self) -> str:
|
|
"""Format the value for display"""
|
|
if self.field_type == FieldType.TIMESTAMP:
|
|
import datetime
|
|
if isinstance(self.value, (int, float)):
|
|
dt = datetime.datetime.fromtimestamp(self.value)
|
|
return dt.strftime("%H:%M:%S.%f")[:-3]
|
|
elif self.field_type == FieldType.FLOAT:
|
|
return f"{self.value:.3f}"
|
|
elif self.field_type == FieldType.IP_ADDRESS:
|
|
return str(self.value)
|
|
elif self.field_type == FieldType.BINARY:
|
|
if isinstance(self.value, bytes):
|
|
return self.value.hex()[:16] + "..." if len(self.value) > 8 else self.value.hex()
|
|
|
|
return str(self.value)
|
|
|
|
|
|
@dataclass
|
|
class ProtocolInfo:
|
|
"""Information about a detected protocol"""
|
|
protocol_type: ProtocolType
|
|
name: str
|
|
category: ProtocolCategory
|
|
version: Optional[str] = None
|
|
confidence: float = 1.0 # Detection confidence 0.0 to 1.0
|
|
|
|
# Protocol-specific metadata
|
|
port: Optional[int] = None
|
|
subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync"
|
|
vendor: Optional[str] = None
|
|
|
|
def __str__(self) -> str:
|
|
version_str = f" v{self.version}" if self.version else ""
|
|
subtype_str = f"-{self.subtype}" if self.subtype else ""
|
|
return f"{self.name}{subtype_str}{version_str}"
|
|
|
|
@property
|
|
def is_enhanced(self) -> bool:
|
|
"""Check if this is an enhanced protocol requiring special handling"""
|
|
return self.category in [ProtocolCategory.ENHANCED, ProtocolCategory.TIMING, ProtocolCategory.TELEMETRY]
|
|
|
|
|
|
class StandardProtocol:
|
|
"""Standard protocol definitions"""
|
|
|
|
UDP = ProtocolInfo(
|
|
protocol_type=ProtocolType.UDP,
|
|
name="UDP",
|
|
category=ProtocolCategory.TRANSPORT
|
|
)
|
|
|
|
TCP = ProtocolInfo(
|
|
protocol_type=ProtocolType.TCP,
|
|
name="TCP",
|
|
category=ProtocolCategory.TRANSPORT
|
|
)
|
|
|
|
ICMP = ProtocolInfo(
|
|
protocol_type=ProtocolType.ICMP,
|
|
name="ICMP",
|
|
category=ProtocolCategory.NETWORK
|
|
)
|
|
|
|
IGMP = ProtocolInfo(
|
|
protocol_type=ProtocolType.IGMP,
|
|
name="IGMP",
|
|
category=ProtocolCategory.NETWORK
|
|
)
|
|
|
|
|
|
class EnhancedProtocol:
|
|
"""Enhanced protocol definitions"""
|
|
|
|
CHAPTER10 = ProtocolInfo(
|
|
protocol_type=ProtocolType.CHAPTER10,
|
|
name="Chapter 10",
|
|
category=ProtocolCategory.TELEMETRY
|
|
)
|
|
|
|
PTP = ProtocolInfo(
|
|
protocol_type=ProtocolType.PTP,
|
|
name="PTP",
|
|
category=ProtocolCategory.TIMING
|
|
)
|
|
|
|
IENA = ProtocolInfo(
|
|
protocol_type=ProtocolType.IENA,
|
|
name="IENA",
|
|
category=ProtocolCategory.TELEMETRY
|
|
)
|
|
|
|
NTP = ProtocolInfo(
|
|
protocol_type=ProtocolType.NTP,
|
|
name="NTP",
|
|
category=ProtocolCategory.TIMING
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ProtocolDecodeResult:
|
|
"""Result of protocol decoding"""
|
|
protocol_info: ProtocolInfo
|
|
fields: List[DecodedField] = field(default_factory=list)
|
|
frame_type: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync"
|
|
payload_size: int = 0
|
|
errors: List[str] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
def get_field(self, name: str) -> Optional[DecodedField]:
|
|
"""Get a specific field by name"""
|
|
for field in self.fields:
|
|
if field.name == name:
|
|
return field
|
|
return None
|
|
|
|
def get_critical_fields(self) -> List[DecodedField]:
|
|
"""Get all critical fields"""
|
|
return [f for f in self.fields if f.is_critical]
|
|
|
|
def has_errors(self) -> bool:
|
|
"""Check if decode result has any errors"""
|
|
return len(self.errors) > 0
|
|
|
|
|
|
class ProtocolRegistry:
|
|
"""Registry for managing protocol information and detection"""
|
|
|
|
def __init__(self):
|
|
self._protocols: Dict[ProtocolType, ProtocolInfo] = {}
|
|
self._register_standard_protocols()
|
|
self._register_enhanced_protocols()
|
|
|
|
def _register_standard_protocols(self):
|
|
"""Register standard protocols"""
|
|
for attr_name in dir(StandardProtocol):
|
|
if not attr_name.startswith('_'):
|
|
protocol = getattr(StandardProtocol, attr_name)
|
|
if isinstance(protocol, ProtocolInfo):
|
|
self._protocols[protocol.protocol_type] = protocol
|
|
|
|
def _register_enhanced_protocols(self):
|
|
"""Register enhanced protocols"""
|
|
for attr_name in dir(EnhancedProtocol):
|
|
if not attr_name.startswith('_'):
|
|
protocol = getattr(EnhancedProtocol, attr_name)
|
|
if isinstance(protocol, ProtocolInfo):
|
|
self._protocols[protocol.protocol_type] = protocol
|
|
|
|
def get_protocol(self, protocol_type: ProtocolType) -> Optional[ProtocolInfo]:
|
|
"""Get protocol info by type"""
|
|
return self._protocols.get(protocol_type)
|
|
|
|
def get_protocol_by_name(self, name: str) -> Optional[ProtocolInfo]:
|
|
"""Get protocol info by name"""
|
|
for protocol in self._protocols.values():
|
|
if protocol.name.lower() == name.lower():
|
|
return protocol
|
|
return None
|
|
|
|
def get_enhanced_protocols(self) -> List[ProtocolInfo]:
|
|
"""Get all enhanced protocols"""
|
|
return [p for p in self._protocols.values() if p.is_enhanced]
|
|
|
|
def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]:
|
|
"""Get all protocols in a category"""
|
|
return [p for p in self._protocols.values() if p.category == category]
|
|
|
|
def register_protocol(self, protocol_info: ProtocolInfo):
|
|
"""Register a new protocol"""
|
|
self._protocols[protocol_info.protocol_type] = protocol_info
|
|
|
|
def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool:
|
|
"""Check if protocol type is enhanced"""
|
|
protocol = self.get_protocol(protocol_type)
|
|
return protocol.is_enhanced if protocol else False
|
|
|
|
|
|
# Global protocol registry instance
|
|
PROTOCOL_REGISTRY = ProtocolRegistry()
|
|
|
|
|
|
def get_protocol_info(protocol_type: ProtocolType) -> Optional[ProtocolInfo]:
|
|
"""Convenience function to get protocol info"""
|
|
return PROTOCOL_REGISTRY.get_protocol(protocol_type)
|
|
|
|
|
|
def is_enhanced_protocol(protocol_type: ProtocolType) -> bool:
|
|
"""Convenience function to check if protocol is enhanced"""
|
|
return PROTOCOL_REGISTRY.is_enhanced_protocol(protocol_type) |