#!/usr/bin/env python3 """Test sequential processing fix for race conditions""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.analysis.background_analyzer import BackgroundAnalyzer import time def test_sequential_processing(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"): """Test that sequential processing fixes frame reference race conditions""" print("=== Testing Sequential Processing Fix ===") # Test 1: Multi-threaded (old way - should show issues) print("\n1. MULTI-THREADED PROCESSING (may have race conditions):") analyzer1 = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) bg_analyzer1 = BackgroundAnalyzer(analyzer1, num_threads=4) # Force multi-threaded bg_analyzer1.start_parsing(pcap_file) while bg_analyzer1.is_parsing: time.sleep(0.1) flow1 = None for flow_key, flow in analyzer1.flows.items(): if flow.src_ip == src_ip: flow1 = flow break if flow1: print(f" Multi-threaded outliers:") for frame_type, ft_stats in flow1.frame_types.items(): if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: for frame_num, prev_frame_num, delta_t in ft_stats.enhanced_outlier_details: # Check for suspicious frame gaps (like 2002 -> 1050) frame_gap = abs(frame_num - prev_frame_num) status = "⚠️ SUSPICIOUS" if frame_gap > 100 else "✅ OK" print(f" {frame_type}: Frame {frame_num} (from {prev_frame_num}): {delta_t*1000:.1f}ms - {status}") # Test 2: Single-threaded (new way - should be correct) print("\n2. SINGLE-THREADED PROCESSING (should be correct):") analyzer2 = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) bg_analyzer2 = BackgroundAnalyzer(analyzer2, num_threads=1) # Single-threaded bg_analyzer2.start_parsing(pcap_file) while bg_analyzer2.is_parsing: time.sleep(0.1) flow2 = None for flow_key, flow in analyzer2.flows.items(): if flow.src_ip == src_ip: flow2 = flow break if flow2: print(f" Single-threaded outliers:") for frame_type, ft_stats in flow2.frame_types.items(): if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: for frame_num, prev_frame_num, delta_t in ft_stats.enhanced_outlier_details: # Check for suspicious frame gaps frame_gap = abs(frame_num - prev_frame_num) status = "⚠️ SUSPICIOUS" if frame_gap > 100 else "✅ OK" print(f" {frame_type}: Frame {frame_num} (from {prev_frame_num}): {delta_t*1000:.1f}ms - {status}") # Test 3: Batch processing (reference - should always be correct) print("\n3. BATCH PROCESSING (reference - always correct):") from analyzer.utils import PCAPLoader analyzer3 = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) loader = PCAPLoader(pcap_file) packets = loader.load_all() for i, packet in enumerate(packets, 1): analyzer3._process_single_packet(packet, i) analyzer3.calculate_statistics() flow3 = None for flow_key, flow in analyzer3.flows.items(): if flow.src_ip == src_ip: flow3 = flow break if flow3: print(f" Batch processing outliers:") for frame_type, ft_stats in flow3.frame_types.items(): if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: for frame_num, prev_frame_num, delta_t in ft_stats.enhanced_outlier_details: # Check for suspicious frame gaps frame_gap = abs(frame_num - prev_frame_num) status = "⚠️ SUSPICIOUS" if frame_gap > 100 else "✅ OK" print(f" {frame_type}: Frame {frame_num} (from {prev_frame_num}): {delta_t*1000:.1f}ms - {status}") # Compare results print(f"\n=== COMPARISON ===") if flow1 and flow2 and flow3: multi_count = sum(len(ft_stats.enhanced_outlier_details) for ft_stats in flow1.frame_types.values() if hasattr(ft_stats, 'enhanced_outlier_details')) single_count = sum(len(ft_stats.enhanced_outlier_details) for ft_stats in flow2.frame_types.values() if hasattr(ft_stats, 'enhanced_outlier_details')) batch_count = sum(len(ft_stats.enhanced_outlier_details) for ft_stats in flow3.frame_types.values() if hasattr(ft_stats, 'enhanced_outlier_details')) print(f"Multi-threaded outlier count: {multi_count}") print(f"Single-threaded outlier count: {single_count}") print(f"Batch processing outlier count: {batch_count}") if single_count == batch_count: print("✅ Single-threaded matches batch processing - RACE CONDITION FIXED!") else: print("⚠️ Single-threaded doesn't match batch processing") if multi_count != batch_count: print("⚠️ Multi-threaded shows race condition issues") # Cleanup bg_analyzer1.cleanup() bg_analyzer2.cleanup() if __name__ == "__main__": if len(sys.argv) > 1: test_sequential_processing(sys.argv[1]) else: test_sequential_processing()