Files
StreamLens/modular_decoder_framework.md
noisedestroyers 5c2cb1a4ed Modern TUI with Enhanced Protocol Hierarchy Interface
Major Features:
- Complete modern TUI interface with three focused views
- Enhanced multi-column layout: Source | Proto | Destination | Extended | Frame Type | Metrics
- Simplified navigation with 1/2/3 hotkeys instead of F1/F2/F3
- Protocol hierarchy: Transport (TCP/UDP) → Extended (CH10/PTP) → Frame Types
- Classic TUI preserved with --classic flag

Views Implemented:
1. Flow Analysis View: Enhanced multi-column flow overview with protocol detection
2. Packet Decoder View: Three-panel deep inspection (Flows | Frames | Fields)
3. Statistical Analysis View: Four analysis modes with timing and quality metrics

Technical Improvements:
- Left-aligned text columns with IP:port precision
- Transport protocol separation from extended protocols
- Frame type identification (CH10-Data, TMATS, PTP Sync)
- Cross-view communication with persistent flow selection
- Context-sensitive help and status bars
- Comprehensive error handling with console fallback
2025-07-26 22:46:49 -04:00

20 KiB

Modular Decoder Framework with Extensible Analysis

Core Concept

Create a framework where decoders expose structured frame data, and separate analysis plugins can process this data for domain-specific insights without modifying the core software.

Architecture Overview

Packet → Decoder → StructuredFrameData → Analysis Plugins → Enhanced FlowStats

1. Enhanced Decoder Interface

Base Decoder with Data Exposure

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Union
import struct

@dataclass
class FieldDefinition:
    """Defines a field that can be extracted from decoded data"""
    name: str
    description: str
    data_type: type  # int, float, str, bytes, etc.
    unit: Optional[str] = None  # "seconds", "nanoseconds", "MHz", etc.
    validator: Optional[callable] = None  # Function to validate field value

@dataclass 
class StructuredFrameData:
    """Container for decoded frame data with metadata"""
    decoder_name: str
    packet_timestamp: float  # Original packet timestamp
    raw_data: bytes
    fields: Dict[str, Any] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def get_field(self, name: str, default=None):
        """Get a field value with optional default"""
        return self.fields.get(name, default)
    
    def has_field(self, name: str) -> bool:
        """Check if field exists"""
        return name in self.fields
    
    def get_timestamp_fields(self) -> List[str]:
        """Get list of fields that represent timestamps"""
        return [name for name, value in self.metadata.items() 
                if name.endswith('_timestamp_field')]

class EnhancedDecoder(ABC):
    """Base class for decoders that expose structured data"""
    
    @property
    @abstractmethod
    def decoder_name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def supported_fields(self) -> List[FieldDefinition]:
        """Return list of fields this decoder can extract"""
        pass
    
    @abstractmethod
    def can_decode(self, packet, transport_info: Dict) -> float:
        """Return confidence score 0.0-1.0"""
        pass
    
    @abstractmethod
    def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]:
        """Decode packet and return structured data"""
        pass
    
    def validate_fields(self, frame_data: StructuredFrameData) -> List[str]:
        """Validate extracted fields, return list of validation errors"""
        errors = []
        for field_def in self.supported_fields:
            if field_def.name in frame_data.fields:
                value = frame_data.fields[field_def.name]
                if field_def.validator and not field_def.validator(value):
                    errors.append(f"Field {field_def.name} failed validation")
        return errors

2. Chapter 10 Enhanced Decoder Example

class Chapter10EnhancedDecoder(EnhancedDecoder):
    """Enhanced Chapter 10 decoder with structured data exposure"""
    
    @property
    def decoder_name(self) -> str:
        return "Chapter10"
    
    @property
    def supported_fields(self) -> List[FieldDefinition]:
        return [
            FieldDefinition("sync_pattern", "Sync pattern (0xEB25)", int),
            FieldDefinition("channel_id", "Channel identifier", int),
            FieldDefinition("packet_length", "Total packet length", int, "bytes"),
            FieldDefinition("data_length", "Data payload length", int, "bytes"),
            FieldDefinition("header_version", "Header version", int),
            FieldDefinition("sequence_number", "Packet sequence number", int),
            FieldDefinition("packet_flags", "Packet flags", int),
            FieldDefinition("data_type", "Data type identifier", int),
            FieldDefinition("relative_time_counter", "RTC value", int, "counts"),
            FieldDefinition("checksum", "Header checksum", int),
            
            # Timing fields
            FieldDefinition("internal_seconds", "CH10 internal seconds", int, "seconds"),
            FieldDefinition("internal_nanoseconds", "CH10 internal nanoseconds", int, "nanoseconds"),
            FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"),
            
            # Data type specific fields
            FieldDefinition("analog_channels", "Number of analog channels", int),
            FieldDefinition("sample_rate", "Sampling rate", float, "Hz"),
            FieldDefinition("pcm_format", "PCM format identifier", int),
        ]
    
    def can_decode(self, packet, transport_info: Dict) -> float:
        """Check for Chapter 10 sync pattern"""
        if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'):
            return 0.0
            
        from scapy.all import Raw
        raw_data = bytes(packet[Raw])
        
        # Look for sync pattern
        if len(raw_data) >= 2:
            sync = struct.unpack('<H', raw_data[:2])[0]
            if sync == 0xEB25:
                return 1.0
        return 0.0
    
    def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]:
        """Decode Chapter 10 frame with full field extraction"""
        if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'):
            return None
            
        from scapy.all import Raw
        raw_data = bytes(packet[Raw])
        
        if len(raw_data) < 24:  # Minimum header size
            return None
            
        try:
            # Extract header fields
            header = struct.unpack('<HHHHHHHHHHHH', raw_data[:24])
            
            frame_data = StructuredFrameData(
                decoder_name=self.decoder_name,
                packet_timestamp=float(packet.time),
                raw_data=raw_data
            )
            
            # Basic header fields
            frame_data.fields.update({
                'sync_pattern': header[0],
                'channel_id': header[1],
                'packet_length': header[2],
                'data_length': header[3],
                'header_version': header[4],
                'sequence_number': header[5],
                'packet_flags': header[6],
                'data_type': header[7],
                'relative_time_counter': header[8],
                'checksum': header[9]
            })
            
            # Extract timing information if present
            if len(raw_data) >= 32:  # Extended header with timing
                timing_data = struct.unpack('<II', raw_data[24:32])
                frame_data.fields.update({
                    'internal_seconds': timing_data[0],
                    'internal_nanoseconds': timing_data[1],
                    'internal_timestamp': timing_data[0] + (timing_data[1] / 1_000_000_000)
                })
                
                # Mark timestamp fields for analysis plugins
                frame_data.metadata.update({
                    'primary_timestamp_field': 'internal_timestamp',
                    'seconds_timestamp_field': 'internal_seconds',
                    'nanoseconds_timestamp_field': 'internal_nanoseconds'
                })
            
            # Data type specific parsing
            if frame_data.fields['data_type'] in [0x72, 0x73, 0x74, 0x75]:  # Analog formats
                analog_info = self._parse_analog_header(raw_data[32:])
                frame_data.fields.update(analog_info)
            elif frame_data.fields['data_type'] in [0x04, 0x08]:  # PCM formats
                pcm_info = self._parse_pcm_header(raw_data[32:])
                frame_data.fields.update(pcm_info)
            
            return frame_data
            
        except (struct.error, IndexError) as e:
            return None
    
    def _parse_analog_header(self, data: bytes) -> Dict[str, Any]:
        """Parse analog format specific header"""
        if len(data) < 8:
            return {}
        try:
            analog_header = struct.unpack('<HHI', data[:8])
            return {
                'analog_channels': analog_header[0],
                'sample_rate': analog_header[2] / 1000.0  # Convert to Hz
            }
        except struct.error:
            return {}
    
    def _parse_pcm_header(self, data: bytes) -> Dict[str, Any]:
        """Parse PCM format specific header"""
        if len(data) < 4:
            return {}
        try:
            pcm_header = struct.unpack('<HH', data[:4])
            return {
                'pcm_format': pcm_header[0]
            }
        except struct.error:
            return {}

3. Analysis Plugin Framework

class AnalysisPlugin(ABC):
    """Base class for analysis plugins that process decoded frame data"""
    
    @property
    @abstractmethod
    def plugin_name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def required_decoders(self) -> List[str]:
        """List of decoder names this plugin requires"""
        pass
    
    @property
    @abstractmethod
    def required_fields(self) -> Dict[str, List[str]]:
        """Dict of decoder_name -> list of required fields"""
        pass
    
    @abstractmethod
    def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]:
        """Analyze a single frame, return analysis results"""
        pass
    
    @abstractmethod
    def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]:
        """Analyze a complete flow, return aggregated results"""
        pass
    
    def can_analyze(self, frame_data: StructuredFrameData) -> bool:
        """Check if this plugin can analyze the given frame data"""
        if frame_data.decoder_name not in self.required_decoders:
            return False
        
        required_fields = self.required_fields.get(frame_data.decoder_name, [])
        return all(frame_data.has_field(field) for field in required_fields)

class Chapter10TimingAnalysisPlugin(AnalysisPlugin):
    """Analysis plugin for Chapter 10 timing comparisons"""
    
    @property
    def plugin_name(self) -> str:
        return "CH10_Timing_Analysis"
    
    @property
    def required_decoders(self) -> List[str]:
        return ["Chapter10"]
    
    @property
    def required_fields(self) -> Dict[str, List[str]]:
        return {
            "Chapter10": ["internal_timestamp", "internal_seconds", "internal_nanoseconds"]
        }
    
    def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]:
        """Compare packet timestamp vs internal Chapter 10 timestamp"""
        packet_time = frame_data.packet_timestamp
        internal_time = frame_data.get_field('internal_timestamp')
        
        if internal_time is None:
            return {'error': 'No internal timestamp available'}
        
        time_delta = packet_time - internal_time
        
        # Calculate clock drift metrics
        analysis = {
            'packet_timestamp': packet_time,
            'internal_timestamp': internal_time,
            'time_delta_seconds': time_delta,
            'time_delta_microseconds': time_delta * 1_000_000,
            'clock_drift_ppm': self._calculate_drift_ppm(time_delta, flow_context),
            'timestamp_source': 'CH10_internal'
        }
        
        # Flag significant timing discrepancies
        if abs(time_delta) > 0.001:  # 1ms threshold
            analysis['timing_anomaly'] = True
            analysis['anomaly_severity'] = 'high' if abs(time_delta) > 0.1 else 'medium'
        
        return analysis
    
    def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]:
        """Analyze timing patterns across entire flow"""
        if not frame_data_list:
            return {}
        
        timing_deltas = []
        internal_intervals = []
        packet_intervals = []
        
        prev_frame = None
        for frame_data in frame_data_list:
            if not self.can_analyze(frame_data):
                continue
                
            # Calculate timing delta for this frame
            packet_time = frame_data.packet_timestamp
            internal_time = frame_data.get_field('internal_timestamp')
            if internal_time:
                timing_deltas.append(packet_time - internal_time)
            
            # Calculate intervals if we have previous frame
            if prev_frame:
                packet_interval = packet_time - prev_frame.packet_timestamp
                internal_interval = internal_time - prev_frame.get_field('internal_timestamp', 0)
                
                packet_intervals.append(packet_interval)
                internal_intervals.append(internal_interval)
            
            prev_frame = frame_data
        
        if not timing_deltas:
            return {'error': 'No valid timing data found'}
        
        # Statistical analysis
        import statistics
        
        flow_analysis = {
            'frame_count': len(timing_deltas),
            'avg_timing_delta': statistics.mean(timing_deltas),
            'std_timing_delta': statistics.stdev(timing_deltas) if len(timing_deltas) > 1 else 0,
            'min_timing_delta': min(timing_deltas),
            'max_timing_delta': max(timing_deltas),
            'timing_drift_trend': self._calculate_drift_trend(timing_deltas),
        }
        
        if packet_intervals and internal_intervals:
            flow_analysis.update({
                'avg_packet_interval': statistics.mean(packet_intervals),
                'avg_internal_interval': statistics.mean(internal_intervals),
                'interval_correlation': self._calculate_correlation(packet_intervals, internal_intervals)
            })
        
        return flow_analysis
    
    def _calculate_drift_ppm(self, time_delta: float, flow_context: Dict) -> float:
        """Calculate clock drift in parts per million"""
        # Simplified drift calculation - would be more sophisticated in practice
        return (time_delta * 1_000_000) / flow_context.get('flow_duration', 1.0)
    
    def _calculate_drift_trend(self, timing_deltas: List[float]) -> str:
        """Determine if timing is drifting in a particular direction"""
        if len(timing_deltas) < 3:
            return 'insufficient_data'
        
        # Simple trend analysis
        first_half = timing_deltas[:len(timing_deltas)//2]
        second_half = timing_deltas[len(timing_deltas)//2:]
        
        first_avg = sum(first_half) / len(first_half)
        second_avg = sum(second_half) / len(second_half)
        
        if second_avg > first_avg + 0.0001:
            return 'increasing_drift'
        elif second_avg < first_avg - 0.0001:
            return 'decreasing_drift'
        else:
            return 'stable'
    
    def _calculate_correlation(self, list1: List[float], list2: List[float]) -> float:
        """Calculate correlation coefficient between two timing sequences"""
        # Simplified correlation - would use proper statistical methods
        if len(list1) != len(list2) or len(list1) < 2:
            return 0.0
        
        # Basic correlation calculation
        import statistics
        mean1 = statistics.mean(list1)
        mean2 = statistics.mean(list2)
        
        numerator = sum((x - mean1) * (y - mean2) for x, y in zip(list1, list2))
        denominator = (sum((x - mean1)**2 for x in list1) * sum((y - mean2)**2 for y in list2))**0.5
        
        return numerator / denominator if denominator != 0 else 0.0

4. Plugin Registry and Integration

class AnalysisPluginRegistry:
    """Registry for managing analysis plugins"""
    
    def __init__(self):
        self.plugins: Dict[str, AnalysisPlugin] = {}
        self.decoder_plugin_map: Dict[str, List[AnalysisPlugin]] = {}
    
    def register_plugin(self, plugin: AnalysisPlugin):
        """Register an analysis plugin"""
        self.plugins[plugin.plugin_name] = plugin
        
        # Map decoders to plugins
        for decoder_name in plugin.required_decoders:
            if decoder_name not in self.decoder_plugin_map:
                self.decoder_plugin_map[decoder_name] = []
            self.decoder_plugin_map[decoder_name].append(plugin)
    
    def get_plugins_for_decoder(self, decoder_name: str) -> List[AnalysisPlugin]:
        """Get all plugins that can analyze data from a specific decoder"""
        return self.decoder_plugin_map.get(decoder_name, [])
    
    def get_plugin(self, plugin_name: str) -> Optional[AnalysisPlugin]:
        """Get a specific plugin by name"""
        return self.plugins.get(plugin_name)

class EnhancedFlowManager:
    """Enhanced flow manager with plugin support"""
    
    def __init__(self):
        self.decoders: List[EnhancedDecoder] = []
        self.plugin_registry = AnalysisPluginRegistry()
        self.flows: Dict[Tuple[str, str], EnhancedFlowStats] = {}
    
    def register_decoder(self, decoder: EnhancedDecoder):
        """Register a decoder"""
        self.decoders.append(decoder)
    
    def register_plugin(self, plugin: AnalysisPlugin):
        """Register an analysis plugin"""
        self.plugin_registry.register_plugin(plugin)
    
    def process_packet(self, packet, frame_num: int):
        """Process packet with enhanced decoder and plugin analysis"""
        # Extract transport info
        transport_info = self._extract_transport_info(packet)
        
        # Try decoders
        for decoder in self.decoders:
            confidence = decoder.can_decode(packet, transport_info)
            if confidence > 0.5:  # Threshold for acceptance
                frame_data = decoder.decode_frame(packet, transport_info)
                if frame_data:
                    # Apply analysis plugins
                    self._apply_plugins(frame_data, transport_info)
                    
                    # Update flow with enhanced data
                    self._update_enhanced_flow(frame_data, transport_info)
                    break
    
    def _apply_plugins(self, frame_data: StructuredFrameData, transport_info: Dict):
        """Apply relevant analysis plugins to frame data"""
        plugins = self.plugin_registry.get_plugins_for_decoder(frame_data.decoder_name)
        
        for plugin in plugins:
            if plugin.can_analyze(frame_data):
                analysis_result = plugin.analyze_frame(frame_data, {'transport_info': transport_info})
                
                # Store plugin results in frame metadata
                frame_data.metadata[f'plugin_{plugin.plugin_name}'] = analysis_result

5. Usage Example

# Setup enhanced system
flow_manager = EnhancedFlowManager()

# Register enhanced decoders
ch10_decoder = Chapter10EnhancedDecoder()
flow_manager.register_decoder(ch10_decoder)

# Register analysis plugins
timing_plugin = Chapter10TimingAnalysisPlugin()
flow_manager.register_plugin(timing_plugin)

# Process packets
for packet in packets:
    flow_manager.process_packet(packet, frame_num)

# Access enhanced analysis results
for flow_key, flow_stats in flow_manager.flows.items():
    timing_analysis = flow_stats.get_plugin_results('CH10_Timing_Analysis')
    if timing_analysis:
        print(f"Flow {flow_key}: Clock drift = {timing_analysis['avg_timing_delta']:.6f}s")

This modular framework allows you to:

  1. Extract any field from decoded frames without modifying core code
  2. Create custom analysis plugins for domain-specific insights
  3. Compare internal vs external timestamps for clock drift analysis
  4. Extend analysis without touching the main analyzer code
  5. Chain multiple plugins for complex analysis workflows

Questions for implementation:

  • Which specific Chapter 10 timing analysis would be most valuable first?
  • Should we create plugins for PTP timestamp analysis as well?
  • What other modular analysis capabilities would be useful?