#!/usr/bin/env python3 """Comprehensive outlier test to find the frame 1001 issue""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.utils import PCAPLoader from analyzer.analysis.background_analyzer import BackgroundAnalyzer def comprehensive_outlier_test(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"): """Comprehensive test of outlier detection across different analysis modes""" print("=== Comprehensive Outlier Test ===") # Test 1: Batch processing (our standard method) print("\n1. BATCH PROCESSING:") analyzer1 = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) loader = PCAPLoader(pcap_file) packets = loader.load_all() for i, packet in enumerate(packets, 1): analyzer1._process_single_packet(packet, i) analyzer1.calculate_statistics() flow1 = None for flow_key, flow in analyzer1.flows.items(): if flow.src_ip == src_ip: flow1 = flow break if flow1: ch10_stats1 = flow1.frame_types.get('CH10-Data') if ch10_stats1: print(f" CH10-Data outliers: {len(ch10_stats1.outlier_frames)}") if hasattr(ch10_stats1, 'enhanced_outlier_details'): for frame_num, prev_frame_num, delta_t in ch10_stats1.enhanced_outlier_details: if frame_num >= 995 and frame_num <= 1005: # Around 1001 print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms") # Test 2: Background analyzer (used by TUI) print("\n2. BACKGROUND ANALYZER:") analyzer2 = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) bg_analyzer = BackgroundAnalyzer(analyzer2, num_threads=1) bg_analyzer.start_parsing(pcap_file) while bg_analyzer.is_parsing: import time time.sleep(0.1) flow2 = None for flow_key, flow in analyzer2.flows.items(): if flow.src_ip == src_ip: flow2 = flow break if flow2: ch10_stats2 = flow2.frame_types.get('CH10-Data') if ch10_stats2: print(f" CH10-Data outliers: {len(ch10_stats2.outlier_frames)}") if hasattr(ch10_stats2, 'enhanced_outlier_details'): for frame_num, prev_frame_num, delta_t in ch10_stats2.enhanced_outlier_details: if frame_num >= 995 and frame_num <= 1005: # Around 1001 print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms") # Test 3: Real-time mode print("\n3. REAL-TIME MODE:") analyzer3 = EthernetAnalyzer(enable_realtime=True, outlier_threshold_sigma=3.0) for i, packet in enumerate(packets, 1): analyzer3._process_single_packet(packet, i) # Don't call calculate_statistics for real-time mode flow3 = None for flow_key, flow in analyzer3.flows.items(): if flow.src_ip == src_ip: flow3 = flow break if flow3: ch10_stats3 = flow3.frame_types.get('CH10-Data') if ch10_stats3: print(f" CH10-Data outliers: {len(ch10_stats3.outlier_frames)}") if hasattr(ch10_stats3, 'enhanced_outlier_details'): for frame_num, prev_frame_num, delta_t in ch10_stats3.enhanced_outlier_details: if frame_num >= 995 and frame_num <= 1005: # Around 1001 print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms") # Test 4: Check for any outliers that might have wrong references print("\n4. SEARCHING FOR SUSPICIOUS OUTLIERS:") test_flows = [flow1, flow2, flow3] mode_names = ["Batch", "Background", "Real-time"] for i, flow in enumerate(test_flows): if not flow: continue print(f"\n {mode_names[i]} Mode:") for frame_type, ft_stats in flow.frame_types.items(): if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: for frame_num, prev_frame_num, delta_t in ft_stats.enhanced_outlier_details: # Check if the frame reference looks suspicious # If prev_frame_num is much smaller than frame_num (like 49 vs 1001), that's suspicious frame_gap = frame_num - prev_frame_num if frame_gap > 50: # Suspicious gap print(f" ⚠️ {frame_type}: Frame {frame_num} (from {prev_frame_num}) - Gap: {frame_gap}") # Test 5: Manual verification of frame 1001 in different modes print("\n5. MANUAL FRAME 1001 VERIFICATION:") target_frame = 1001 for i, flow in enumerate(test_flows): if not flow: continue print(f"\n {mode_names[i]} Mode - Frame {target_frame}:") ch10_stats = flow.frame_types.get('CH10-Data') if ch10_stats and target_frame in ch10_stats.frame_numbers: frame_index = ch10_stats.frame_numbers.index(target_frame) if frame_index > 0: expected_prev = ch10_stats.frame_numbers[frame_index - 1] print(f" Expected previous frame: {expected_prev}") # Check if this frame is an outlier is_outlier = False if hasattr(ch10_stats, 'enhanced_outlier_details'): for frame_num, prev_frame_num, delta_t in ch10_stats.enhanced_outlier_details: if frame_num == target_frame: print(f" Found as outlier: Frame {frame_num} (from {prev_frame_num})") if prev_frame_num != expected_prev: print(f" ❌ MISMATCH! Expected {expected_prev}, got {prev_frame_num}") else: print(f" ✅ Frame reference correct") is_outlier = True break if not is_outlier: print(f" Frame {target_frame} is not an outlier") if __name__ == "__main__": if len(sys.argv) > 1: comprehensive_outlier_test(sys.argv[1]) else: comprehensive_outlier_test()