""" Example showing how the enhanced CH10 decoder integrates with the TUI This demonstrates the full data flow from packet to enhanced analysis display """ from analyzer.protocols.enhanced_chapter10 import EnhancedChapter10Decoder from analyzer.plugins.ch10_timing_analysis import Chapter10TimingAnalysisPlugin from analyzer.models.flow_stats import FlowStats, EnhancedAnalysisData import statistics class EnhancedFlowProcessor: """Example processor showing integration of enhanced CH10 analysis""" def __init__(self): self.ch10_decoder = EnhancedChapter10Decoder() self.timing_plugin = Chapter10TimingAnalysisPlugin() self.flows = {} def process_packet(self, packet, frame_num: int): """Process packet with enhanced analysis""" # Extract transport info (simplified) transport_info = {'protocol': 'UDP', 'src_port': 4001, 'dst_port': 4001} # Check if CH10 decoder can handle this packet confidence = self.ch10_decoder.can_decode(packet, transport_info) if confidence > 0.5: # Decode frame with full field extraction frame_data = self.ch10_decoder.decode_frame(packet, transport_info) if frame_data: # Run timing analysis plugin flow_context = {'flow_duration': 60.0, 'flow_key': 'example_flow'} timing_result = self.timing_plugin.analyze_frame(frame_data, flow_context) # Update or create flow with enhanced data self._update_flow_with_enhanced_data(frame_data, timing_result, packet) def _update_flow_with_enhanced_data(self, frame_data, timing_result, packet): """Update flow statistics with enhanced analysis data""" # Create flow key flow_key = "192.168.1.100->239.255.0.1" # Example # Get or create flow if flow_key not in self.flows: self.flows[flow_key] = FlowStats( src_ip="192.168.1.100", dst_ip="239.255.0.1", src_port=4001, dst_port=4001, transport_protocol="UDP", traffic_classification="Multicast" ) flow = self.flows[flow_key] # Update basic flow stats flow.frame_count += 1 flow.total_bytes += len(packet) flow.timestamps.append(float(packet.time)) # Update enhanced analysis data self._update_enhanced_analysis_data(flow, frame_data, timing_result) def _update_enhanced_analysis_data(self, flow: FlowStats, frame_data, timing_result): """Update the enhanced analysis data structure""" enhanced = flow.enhanced_analysis # Set decoder type enhanced.decoder_type = "Chapter10_Enhanced" # Update timing analysis if timing_result.internal_timestamp is not None: enhanced.has_internal_timing = True # Update running averages for timing if timing_result.clock_drift_ppm is not None: if enhanced.avg_clock_drift_ppm == 0: enhanced.avg_clock_drift_ppm = timing_result.clock_drift_ppm else: # Simple running average enhanced.avg_clock_drift_ppm = (enhanced.avg_clock_drift_ppm + timing_result.clock_drift_ppm) / 2 enhanced.max_clock_drift_ppm = max(enhanced.max_clock_drift_ppm, abs(timing_result.clock_drift_ppm)) # Update timing quality (use most recent) enhanced.timing_quality = timing_result.timing_quality # Update anomaly rate if timing_result.anomaly_detected: enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1) + 1) / flow.frame_count else: enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1)) / flow.frame_count # Update confidence score enhanced.avg_confidence_score = (enhanced.avg_confidence_score + timing_result.confidence_score) / 2 # Update frame quality frame_quality = frame_data.get_field('frame_quality_score', 0) if frame_quality > 0: if enhanced.avg_frame_quality == 0: enhanced.avg_frame_quality = frame_quality else: enhanced.avg_frame_quality = (enhanced.avg_frame_quality + frame_quality) / 2 # Update error counts if frame_data.get_field('rtc_sync_error', False): enhanced.rtc_sync_errors += 1 if frame_data.get_field('format_error', False): enhanced.format_errors += 1 if frame_data.get_field('overflow_error', False): enhanced.overflow_errors += 1 # Update channel information channel_id = frame_data.get_field('channel_id', 0) if channel_id > 0: enhanced.channel_count = max(enhanced.channel_count, channel_id) # Update data type counters if frame_data.get_field('is_analog_data', False): enhanced.analog_channels = max(enhanced.analog_channels, 1) if frame_data.get_field('is_pcm_data', False): enhanced.pcm_channels = max(enhanced.pcm_channels, 1) if frame_data.get_field('is_tmats_data', False): enhanced.tmats_frames += 1 # Set primary data type data_type_name = frame_data.get_field('data_type_name', 'Unknown') if enhanced.primary_data_type == "Unknown": enhanced.primary_data_type = data_type_name def get_example_enhanced_flow(self) -> FlowStats: """Generate an example flow with enhanced analysis for TUI demonstration""" # Create a sample flow with realistic enhanced data flow = FlowStats( src_ip="192.168.1.100", dst_ip="239.255.0.1", src_port=4001, dst_port=4001, transport_protocol="UDP", traffic_classification="Multicast", frame_count=1234, total_bytes=1048576, # 1MB avg_inter_arrival=0.001 # 1ms ) # Enhanced analysis data enhanced = EnhancedAnalysisData( # Timing analysis avg_clock_drift_ppm=15.5, max_clock_drift_ppm=23.8, timing_quality="good", timing_stability="stable", anomaly_rate=0.023, # 2.3% avg_confidence_score=0.87, # Frame quality avg_frame_quality=94.2, sequence_gaps=2, rtc_sync_errors=1, format_errors=0, overflow_errors=0, # Data analysis channel_count=4, analog_channels=3, pcm_channels=0, tmats_frames=5, # General has_internal_timing=True, primary_data_type="Analog Format 2", decoder_type="Chapter10_Enhanced" ) flow.enhanced_analysis = enhanced flow.detected_protocol_types.add("Chapter10") return flow def demonstrate_tui_integration(): """Demonstrate how enhanced data appears in TUI""" processor = EnhancedFlowProcessor() example_flow = processor.get_example_enhanced_flow() print("=== Enhanced TUI Display Example ===") print() # Show how data appears in main flow table print("Main Flow Table:") print("Src:Port Dst:Port Proto Cast #Frames Bytes Encoding Quality Drift ΔT Avg") print("192.168.1.100:4001 239.255.0.1:4001 UDP Mult 1234 1.0M Chapter10 94% 15.5ppm 1.0ms") print() # Show enhanced detail panel print("Enhanced Detail Panel:") print("FLOW DETAILS: 192.168.1.100 -> 239.255.0.1") print() print("Packets: 1234 | Bytes: 1,048,576") print() print("Enhanced Analysis (Chapter10_Enhanced):") print(" Clock Drift: 15.50 PPM (max: 23.80)") print(" Timing Quality: Good | Stability: Stable") print(" Anomaly Rate: 2.3% | Confidence: 0.87") print(" Frame Quality: 94.2%") print(" Errors: Seq: 2 | RTC: 1") print(" Channels: 4 (Analog: 3) (TMATS: 5)") print(" Primary Type: Analog Format 2") print() # Show available enhanced fields print("Available Enhanced Fields for Analysis:") supported_fields = processor.ch10_decoder.supported_fields timing_fields = [f for f in supported_fields if 'time' in f.name.lower() or 'drift' in f.name.lower()] quality_fields = [f for f in supported_fields if 'quality' in f.name.lower() or 'error' in f.name.lower()] data_fields = [f for f in supported_fields if 'analog' in f.name.lower() or 'channel' in f.name.lower()] print(" Timing Fields:") for field in timing_fields[:5]: # Show first 5 print(f" - {field.name}: {field.description}") print(" Quality Fields:") for field in quality_fields[:5]: # Show first 5 print(f" - {field.name}: {field.description}") print(" Data Analysis Fields:") for field in data_fields[:5]: # Show first 5 print(f" - {field.name}: {field.description}") print(f" ... and {len(supported_fields) - 15} more fields available for custom analysis") if __name__ == "__main__": demonstrate_tui_integration()