#!/usr/bin/env python3 """ Test script to verify enhanced CH10 decode data is being captured """ import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.utils import PCAPLoader def test_decode_data(): """Test that decoded frame data is being captured""" # Create analyzer analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) # Load and process PCAP print("Loading and processing PCAP...") pcap_loader = PCAPLoader("1 PTPGM.pcapng") packets = pcap_loader.load_all() # Process packets through flow manager for i, packet in enumerate(packets): analyzer.flow_manager.process_packet(packet, i + 1) # Calculate statistics analyzer.statistics_engine.calculate_flow_statistics(analyzer.flow_manager.flows) print("\n=== ENHANCED DECODE DATA TEST ===") # Debug: Show all flows and their decoder types print(f"Total flows found: {len(analyzer.flow_manager.flows)}") for flow_key, flow in analyzer.flow_manager.flows.items(): print(f"Flow {flow_key}: {flow.enhanced_analysis.decoder_type}, protocols: {flow.detected_protocol_types}") # Find CH10 flows ch10_flows = [] for flow in analyzer.flow_manager.flows.values(): if flow.enhanced_analysis.decoder_type == "Chapter10_Enhanced": ch10_flows.append(flow) print(f"Found {len(ch10_flows)} flows with enhanced CH10 decoding") for i, flow in enumerate(ch10_flows): print(f"\nFlow {i+1}: {flow.src_ip} -> {flow.dst_ip}") enhanced = flow.enhanced_analysis print(f" Decoder Type: {enhanced.decoder_type}") print(f" Primary Data Type: {enhanced.primary_data_type}") print(f" Available Fields: {len(enhanced.available_field_names)}") print(f" Sample Frames Stored: {len(enhanced.sample_decoded_fields)}") if enhanced.sample_decoded_fields: print(" Sample Decoded Data:") for frame_key, frame_data in enhanced.sample_decoded_fields.items(): print(f" {frame_key}: {len(frame_data)} fields") # Show first few fields for j, (field_name, value) in enumerate(list(frame_data.items())[:5]): print(f" {field_name}: {value}") if len(frame_data) > 5: print(f" ... and {len(frame_data) - 5} more fields") else: print(" No decoded frame data stored!") if enhanced.available_field_names: print(f" Field Categories: Timing: {sum(1 for f in enhanced.available_field_names if 'time' in f.lower())}, " f"Quality: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['quality', 'error']))}, " f"Data: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['data', 'analog', 'channel']))}") if __name__ == "__main__": test_decode_data()