#!/usr/bin/env python3 """Test that frame reference issues are now fixed""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.analysis.background_analyzer import BackgroundAnalyzer import time def test_fixed_frame_references(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"): """Test that frame reference issues are now fixed""" print("=== Testing Fixed Frame References ===") # Use single-threaded background analyzer (like TUI now does) analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) bg_analyzer = BackgroundAnalyzer(analyzer, num_threads=1) bg_analyzer.start_parsing(pcap_file) while bg_analyzer.is_parsing: time.sleep(0.1) # Find test flow test_flow = None for flow_key, flow in analyzer.flows.items(): if flow.src_ip == src_ip: test_flow = flow break if not test_flow: print(f"❌ No flow found from {src_ip}") bg_analyzer.cleanup() return print(f"✅ Found flow: {test_flow.src_ip}:{test_flow.src_port} → {test_flow.dst_ip}:{test_flow.dst_port}") # Check CH10-Data outliers ch10_data_stats = test_flow.frame_types.get('CH10-Data') if not ch10_data_stats: print("❌ No CH10-Data frame type found") bg_analyzer.cleanup() return print(f"\nCH10-Data: {len(ch10_data_stats.frame_numbers)} frames") print(f"CH10-Data outliers: {len(ch10_data_stats.outlier_frames)}") if hasattr(ch10_data_stats, 'enhanced_outlier_details') and ch10_data_stats.enhanced_outlier_details: print(f"\n=== Enhanced Outlier Details ===") all_correct = True for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details: # Verify frame reference is correct if frame_num in ch10_data_stats.frame_numbers: frame_index = ch10_data_stats.frame_numbers.index(frame_num) if frame_index > 0: expected_prev = ch10_data_stats.frame_numbers[frame_index - 1] status = "✅ CORRECT" if prev_frame_num == expected_prev else f"❌ WRONG (expected {expected_prev})" if prev_frame_num != expected_prev: all_correct = False deviation = (delta_t - ch10_data_stats.avg_inter_arrival) / ch10_data_stats.std_inter_arrival if ch10_data_stats.std_inter_arrival > 0 else 0 print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms ({deviation:.1f}σ) - {status}") else: print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms - First frame") if all_correct: print(f"\n🎉 ALL FRAME REFERENCES ARE CORRECT!") else: print(f"\n⚠️ Some frame references still incorrect") # Check specific frames that were problematic before problem_frames = [486, 957] print(f"\n=== Checking Previously Problematic Frames ===") for target_frame in problem_frames: found = False for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details: if frame_num == target_frame: if frame_num in ch10_data_stats.frame_numbers: frame_index = ch10_data_stats.frame_numbers.index(frame_num) if frame_index > 0: expected_prev = ch10_data_stats.frame_numbers[frame_index - 1] if prev_frame_num == expected_prev: print(f" ✅ Frame {target_frame}: FIXED! Now correctly shows previous frame {prev_frame_num}") else: print(f" ❌ Frame {target_frame}: Still wrong - shows {prev_frame_num}, expected {expected_prev}") found = True break if not found: print(f" ℹ️ Frame {target_frame}: Not an outlier (timing is normal)") else: print("❌ No enhanced outlier details found") # Show frame type summary print(f"\n=== Frame Type Summary ===") total_outliers = 0 for frame_type, ft_stats in sorted(test_flow.frame_types.items()): outlier_count = len(ft_stats.outlier_frames) total_outliers += outlier_count if outlier_count > 0: print(f"{frame_type}: {outlier_count} outliers") print(f"Total outliers: {total_outliers}") bg_analyzer.cleanup() if __name__ == "__main__": if len(sys.argv) > 1: test_fixed_frame_references(sys.argv[1]) else: test_fixed_frame_references()