# Modular Decoder Framework with Extensible Analysis ## Core Concept Create a framework where decoders expose structured frame data, and separate analysis plugins can process this data for domain-specific insights without modifying the core software. ## Architecture Overview ``` Packet → Decoder → StructuredFrameData → Analysis Plugins → Enhanced FlowStats ``` ## 1. Enhanced Decoder Interface ### Base Decoder with Data Exposure ```python from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Union import struct @dataclass class FieldDefinition: """Defines a field that can be extracted from decoded data""" name: str description: str data_type: type # int, float, str, bytes, etc. unit: Optional[str] = None # "seconds", "nanoseconds", "MHz", etc. validator: Optional[callable] = None # Function to validate field value @dataclass class StructuredFrameData: """Container for decoded frame data with metadata""" decoder_name: str packet_timestamp: float # Original packet timestamp raw_data: bytes fields: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) def get_field(self, name: str, default=None): """Get a field value with optional default""" return self.fields.get(name, default) def has_field(self, name: str) -> bool: """Check if field exists""" return name in self.fields def get_timestamp_fields(self) -> List[str]: """Get list of fields that represent timestamps""" return [name for name, value in self.metadata.items() if name.endswith('_timestamp_field')] class EnhancedDecoder(ABC): """Base class for decoders that expose structured data""" @property @abstractmethod def decoder_name(self) -> str: pass @property @abstractmethod def supported_fields(self) -> List[FieldDefinition]: """Return list of fields this decoder can extract""" pass @abstractmethod def can_decode(self, packet, transport_info: Dict) -> float: """Return confidence score 0.0-1.0""" pass @abstractmethod def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]: """Decode packet and return structured data""" pass def validate_fields(self, frame_data: StructuredFrameData) -> List[str]: """Validate extracted fields, return list of validation errors""" errors = [] for field_def in self.supported_fields: if field_def.name in frame_data.fields: value = frame_data.fields[field_def.name] if field_def.validator and not field_def.validator(value): errors.append(f"Field {field_def.name} failed validation") return errors ``` ## 2. Chapter 10 Enhanced Decoder Example ```python class Chapter10EnhancedDecoder(EnhancedDecoder): """Enhanced Chapter 10 decoder with structured data exposure""" @property def decoder_name(self) -> str: return "Chapter10" @property def supported_fields(self) -> List[FieldDefinition]: return [ FieldDefinition("sync_pattern", "Sync pattern (0xEB25)", int), FieldDefinition("channel_id", "Channel identifier", int), FieldDefinition("packet_length", "Total packet length", int, "bytes"), FieldDefinition("data_length", "Data payload length", int, "bytes"), FieldDefinition("header_version", "Header version", int), FieldDefinition("sequence_number", "Packet sequence number", int), FieldDefinition("packet_flags", "Packet flags", int), FieldDefinition("data_type", "Data type identifier", int), FieldDefinition("relative_time_counter", "RTC value", int, "counts"), FieldDefinition("checksum", "Header checksum", int), # Timing fields FieldDefinition("internal_seconds", "CH10 internal seconds", int, "seconds"), FieldDefinition("internal_nanoseconds", "CH10 internal nanoseconds", int, "nanoseconds"), FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"), # Data type specific fields FieldDefinition("analog_channels", "Number of analog channels", int), FieldDefinition("sample_rate", "Sampling rate", float, "Hz"), FieldDefinition("pcm_format", "PCM format identifier", int), ] def can_decode(self, packet, transport_info: Dict) -> float: """Check for Chapter 10 sync pattern""" if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): return 0.0 from scapy.all import Raw raw_data = bytes(packet[Raw]) # Look for sync pattern if len(raw_data) >= 2: sync = struct.unpack(' Optional[StructuredFrameData]: """Decode Chapter 10 frame with full field extraction""" if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): return None from scapy.all import Raw raw_data = bytes(packet[Raw]) if len(raw_data) < 24: # Minimum header size return None try: # Extract header fields header = struct.unpack('= 32: # Extended header with timing timing_data = struct.unpack(' Dict[str, Any]: """Parse analog format specific header""" if len(data) < 8: return {} try: analog_header = struct.unpack(' Dict[str, Any]: """Parse PCM format specific header""" if len(data) < 4: return {} try: pcm_header = struct.unpack(' str: pass @property @abstractmethod def required_decoders(self) -> List[str]: """List of decoder names this plugin requires""" pass @property @abstractmethod def required_fields(self) -> Dict[str, List[str]]: """Dict of decoder_name -> list of required fields""" pass @abstractmethod def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]: """Analyze a single frame, return analysis results""" pass @abstractmethod def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]: """Analyze a complete flow, return aggregated results""" pass def can_analyze(self, frame_data: StructuredFrameData) -> bool: """Check if this plugin can analyze the given frame data""" if frame_data.decoder_name not in self.required_decoders: return False required_fields = self.required_fields.get(frame_data.decoder_name, []) return all(frame_data.has_field(field) for field in required_fields) class Chapter10TimingAnalysisPlugin(AnalysisPlugin): """Analysis plugin for Chapter 10 timing comparisons""" @property def plugin_name(self) -> str: return "CH10_Timing_Analysis" @property def required_decoders(self) -> List[str]: return ["Chapter10"] @property def required_fields(self) -> Dict[str, List[str]]: return { "Chapter10": ["internal_timestamp", "internal_seconds", "internal_nanoseconds"] } def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]: """Compare packet timestamp vs internal Chapter 10 timestamp""" packet_time = frame_data.packet_timestamp internal_time = frame_data.get_field('internal_timestamp') if internal_time is None: return {'error': 'No internal timestamp available'} time_delta = packet_time - internal_time # Calculate clock drift metrics analysis = { 'packet_timestamp': packet_time, 'internal_timestamp': internal_time, 'time_delta_seconds': time_delta, 'time_delta_microseconds': time_delta * 1_000_000, 'clock_drift_ppm': self._calculate_drift_ppm(time_delta, flow_context), 'timestamp_source': 'CH10_internal' } # Flag significant timing discrepancies if abs(time_delta) > 0.001: # 1ms threshold analysis['timing_anomaly'] = True analysis['anomaly_severity'] = 'high' if abs(time_delta) > 0.1 else 'medium' return analysis def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]: """Analyze timing patterns across entire flow""" if not frame_data_list: return {} timing_deltas = [] internal_intervals = [] packet_intervals = [] prev_frame = None for frame_data in frame_data_list: if not self.can_analyze(frame_data): continue # Calculate timing delta for this frame packet_time = frame_data.packet_timestamp internal_time = frame_data.get_field('internal_timestamp') if internal_time: timing_deltas.append(packet_time - internal_time) # Calculate intervals if we have previous frame if prev_frame: packet_interval = packet_time - prev_frame.packet_timestamp internal_interval = internal_time - prev_frame.get_field('internal_timestamp', 0) packet_intervals.append(packet_interval) internal_intervals.append(internal_interval) prev_frame = frame_data if not timing_deltas: return {'error': 'No valid timing data found'} # Statistical analysis import statistics flow_analysis = { 'frame_count': len(timing_deltas), 'avg_timing_delta': statistics.mean(timing_deltas), 'std_timing_delta': statistics.stdev(timing_deltas) if len(timing_deltas) > 1 else 0, 'min_timing_delta': min(timing_deltas), 'max_timing_delta': max(timing_deltas), 'timing_drift_trend': self._calculate_drift_trend(timing_deltas), } if packet_intervals and internal_intervals: flow_analysis.update({ 'avg_packet_interval': statistics.mean(packet_intervals), 'avg_internal_interval': statistics.mean(internal_intervals), 'interval_correlation': self._calculate_correlation(packet_intervals, internal_intervals) }) return flow_analysis def _calculate_drift_ppm(self, time_delta: float, flow_context: Dict) -> float: """Calculate clock drift in parts per million""" # Simplified drift calculation - would be more sophisticated in practice return (time_delta * 1_000_000) / flow_context.get('flow_duration', 1.0) def _calculate_drift_trend(self, timing_deltas: List[float]) -> str: """Determine if timing is drifting in a particular direction""" if len(timing_deltas) < 3: return 'insufficient_data' # Simple trend analysis first_half = timing_deltas[:len(timing_deltas)//2] second_half = timing_deltas[len(timing_deltas)//2:] first_avg = sum(first_half) / len(first_half) second_avg = sum(second_half) / len(second_half) if second_avg > first_avg + 0.0001: return 'increasing_drift' elif second_avg < first_avg - 0.0001: return 'decreasing_drift' else: return 'stable' def _calculate_correlation(self, list1: List[float], list2: List[float]) -> float: """Calculate correlation coefficient between two timing sequences""" # Simplified correlation - would use proper statistical methods if len(list1) != len(list2) or len(list1) < 2: return 0.0 # Basic correlation calculation import statistics mean1 = statistics.mean(list1) mean2 = statistics.mean(list2) numerator = sum((x - mean1) * (y - mean2) for x, y in zip(list1, list2)) denominator = (sum((x - mean1)**2 for x in list1) * sum((y - mean2)**2 for y in list2))**0.5 return numerator / denominator if denominator != 0 else 0.0 ``` ## 4. Plugin Registry and Integration ```python class AnalysisPluginRegistry: """Registry for managing analysis plugins""" def __init__(self): self.plugins: Dict[str, AnalysisPlugin] = {} self.decoder_plugin_map: Dict[str, List[AnalysisPlugin]] = {} def register_plugin(self, plugin: AnalysisPlugin): """Register an analysis plugin""" self.plugins[plugin.plugin_name] = plugin # Map decoders to plugins for decoder_name in plugin.required_decoders: if decoder_name not in self.decoder_plugin_map: self.decoder_plugin_map[decoder_name] = [] self.decoder_plugin_map[decoder_name].append(plugin) def get_plugins_for_decoder(self, decoder_name: str) -> List[AnalysisPlugin]: """Get all plugins that can analyze data from a specific decoder""" return self.decoder_plugin_map.get(decoder_name, []) def get_plugin(self, plugin_name: str) -> Optional[AnalysisPlugin]: """Get a specific plugin by name""" return self.plugins.get(plugin_name) class EnhancedFlowManager: """Enhanced flow manager with plugin support""" def __init__(self): self.decoders: List[EnhancedDecoder] = [] self.plugin_registry = AnalysisPluginRegistry() self.flows: Dict[Tuple[str, str], EnhancedFlowStats] = {} def register_decoder(self, decoder: EnhancedDecoder): """Register a decoder""" self.decoders.append(decoder) def register_plugin(self, plugin: AnalysisPlugin): """Register an analysis plugin""" self.plugin_registry.register_plugin(plugin) def process_packet(self, packet, frame_num: int): """Process packet with enhanced decoder and plugin analysis""" # Extract transport info transport_info = self._extract_transport_info(packet) # Try decoders for decoder in self.decoders: confidence = decoder.can_decode(packet, transport_info) if confidence > 0.5: # Threshold for acceptance frame_data = decoder.decode_frame(packet, transport_info) if frame_data: # Apply analysis plugins self._apply_plugins(frame_data, transport_info) # Update flow with enhanced data self._update_enhanced_flow(frame_data, transport_info) break def _apply_plugins(self, frame_data: StructuredFrameData, transport_info: Dict): """Apply relevant analysis plugins to frame data""" plugins = self.plugin_registry.get_plugins_for_decoder(frame_data.decoder_name) for plugin in plugins: if plugin.can_analyze(frame_data): analysis_result = plugin.analyze_frame(frame_data, {'transport_info': transport_info}) # Store plugin results in frame metadata frame_data.metadata[f'plugin_{plugin.plugin_name}'] = analysis_result ``` ## 5. Usage Example ```python # Setup enhanced system flow_manager = EnhancedFlowManager() # Register enhanced decoders ch10_decoder = Chapter10EnhancedDecoder() flow_manager.register_decoder(ch10_decoder) # Register analysis plugins timing_plugin = Chapter10TimingAnalysisPlugin() flow_manager.register_plugin(timing_plugin) # Process packets for packet in packets: flow_manager.process_packet(packet, frame_num) # Access enhanced analysis results for flow_key, flow_stats in flow_manager.flows.items(): timing_analysis = flow_stats.get_plugin_results('CH10_Timing_Analysis') if timing_analysis: print(f"Flow {flow_key}: Clock drift = {timing_analysis['avg_timing_delta']:.6f}s") ``` This modular framework allows you to: 1. **Extract any field** from decoded frames without modifying core code 2. **Create custom analysis plugins** for domain-specific insights 3. **Compare internal vs external timestamps** for clock drift analysis 4. **Extend analysis** without touching the main analyzer code 5. **Chain multiple plugins** for complex analysis workflows **Questions for implementation:** - Which specific Chapter 10 timing analysis would be most valuable first? - Should we create plugins for PTP timestamp analysis as well? - What other modular analysis capabilities would be useful?