#!/usr/bin/env python3 """Test script for enhanced outlier tracking""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.utils import PCAPLoader def test_enhanced_outlier_tracking(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"): """Test enhanced outlier tracking functionality""" print("=== Testing Enhanced Outlier Tracking ===") # Initialize analyzer analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) # Load and process packets loader = PCAPLoader(pcap_file) packets = loader.load_all() print(f"Loaded {len(packets)} packets") # Process packets for i, packet in enumerate(packets, 1): analyzer._process_single_packet(packet, i) # Calculate statistics to populate outlier data analyzer.calculate_statistics() # Find the test flow test_flow = None for flow_key, flow in analyzer.flows.items(): if flow.src_ip == src_ip: test_flow = flow break if not test_flow: print(f"❌ No flow found from {src_ip}") return print(f"\n✅ Found flow: {test_flow.src_ip}:{test_flow.src_port} → {test_flow.dst_ip}:{test_flow.dst_port}") print(f" Total packets: {test_flow.frame_count}") # Test frame type outlier tracking print(f"\n=== Frame Type Analysis ===") total_frame_type_outliers = 0 for frame_type, ft_stats in test_flow.frame_types.items(): outlier_count = len(ft_stats.outlier_frames) total_frame_type_outliers += outlier_count if outlier_count > 0: print(f"\n{frame_type}: {outlier_count} outliers") print(f" Avg ΔT: {ft_stats.avg_inter_arrival * 1000:.3f} ms") print(f" Std σ: {ft_stats.std_inter_arrival * 1000:.3f} ms") print(f" Threshold: {(ft_stats.avg_inter_arrival + 3 * ft_stats.std_inter_arrival) * 1000:.3f} ms") # Test enhanced outlier details if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: print(" ✅ Enhanced outlier details available:") for i, (frame_num, prev_frame_num, delta_t) in enumerate(ft_stats.enhanced_outlier_details[:3]): deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0 print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms ({deviation:.1f}σ)") if len(ft_stats.enhanced_outlier_details) > 3: print(f" ... and {len(ft_stats.enhanced_outlier_details) - 3} more") elif ft_stats.outlier_details: print(" ⚠️ Legacy outlier details only:") for i, (frame_num, delta_t) in enumerate(ft_stats.outlier_details[:3]): deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0 print(f" Frame {frame_num}: {delta_t * 1000:.3f} ms ({deviation:.1f}σ)") if len(ft_stats.outlier_details) > 3: print(f" ... and {len(ft_stats.outlier_details) - 3} more") else: print(" ❌ No outlier details found") print(f"\n=== Summary ===") print(f"Total frame-type outliers: {total_frame_type_outliers}") # Check if CH10-Data specifically has outliers ch10_data_stats = test_flow.frame_types.get('CH10-Data') if ch10_data_stats: ch10_outliers = len(ch10_data_stats.outlier_frames) print(f"CH10-Data outliers: {ch10_outliers}") if hasattr(ch10_data_stats, 'enhanced_outlier_details'): enhanced_count = len(ch10_data_stats.enhanced_outlier_details) print(f"CH10-Data enhanced details: {enhanced_count}") if enhanced_count > 0: print("✅ Enhanced outlier tracking is working correctly!") else: print("⚠️ Enhanced outlier tracking not populated") else: print("❌ Enhanced outlier details attribute missing") else: print("❌ No CH10-Data frame type found") if __name__ == "__main__": if len(sys.argv) > 1: test_enhanced_outlier_tracking(sys.argv[1]) else: test_enhanced_outlier_tracking()