Files
StreamLens/test_fixed_frame_references.py

115 lines
4.8 KiB
Python
Raw Permalink Normal View History

2025-07-30 23:48:32 -04:00
#!/usr/bin/env python3
"""Test that frame reference issues are now fixed"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.analysis.background_analyzer import BackgroundAnalyzer
import time
def test_fixed_frame_references(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
"""Test that frame reference issues are now fixed"""
print("=== Testing Fixed Frame References ===")
# Use single-threaded background analyzer (like TUI now does)
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
bg_analyzer = BackgroundAnalyzer(analyzer, num_threads=1)
bg_analyzer.start_parsing(pcap_file)
while bg_analyzer.is_parsing:
time.sleep(0.1)
# Find test flow
test_flow = None
for flow_key, flow in analyzer.flows.items():
if flow.src_ip == src_ip:
test_flow = flow
break
if not test_flow:
print(f"❌ No flow found from {src_ip}")
bg_analyzer.cleanup()
return
print(f"✅ Found flow: {test_flow.src_ip}:{test_flow.src_port}{test_flow.dst_ip}:{test_flow.dst_port}")
# Check CH10-Data outliers
ch10_data_stats = test_flow.frame_types.get('CH10-Data')
if not ch10_data_stats:
print("❌ No CH10-Data frame type found")
bg_analyzer.cleanup()
return
print(f"\nCH10-Data: {len(ch10_data_stats.frame_numbers)} frames")
print(f"CH10-Data outliers: {len(ch10_data_stats.outlier_frames)}")
if hasattr(ch10_data_stats, 'enhanced_outlier_details') and ch10_data_stats.enhanced_outlier_details:
print(f"\n=== Enhanced Outlier Details ===")
all_correct = True
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
# Verify frame reference is correct
if frame_num in ch10_data_stats.frame_numbers:
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
if frame_index > 0:
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
status = "✅ CORRECT" if prev_frame_num == expected_prev else f"❌ WRONG (expected {expected_prev})"
if prev_frame_num != expected_prev:
all_correct = False
deviation = (delta_t - ch10_data_stats.avg_inter_arrival) / ch10_data_stats.std_inter_arrival if ch10_data_stats.std_inter_arrival > 0 else 0
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms ({deviation:.1f}σ) - {status}")
else:
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms - First frame")
if all_correct:
print(f"\n🎉 ALL FRAME REFERENCES ARE CORRECT!")
else:
print(f"\n⚠️ Some frame references still incorrect")
# Check specific frames that were problematic before
problem_frames = [486, 957]
print(f"\n=== Checking Previously Problematic Frames ===")
for target_frame in problem_frames:
found = False
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
if frame_num == target_frame:
if frame_num in ch10_data_stats.frame_numbers:
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
if frame_index > 0:
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
if prev_frame_num == expected_prev:
print(f" ✅ Frame {target_frame}: FIXED! Now correctly shows previous frame {prev_frame_num}")
else:
print(f" ❌ Frame {target_frame}: Still wrong - shows {prev_frame_num}, expected {expected_prev}")
found = True
break
if not found:
print(f" Frame {target_frame}: Not an outlier (timing is normal)")
else:
print("❌ No enhanced outlier details found")
# Show frame type summary
print(f"\n=== Frame Type Summary ===")
total_outliers = 0
for frame_type, ft_stats in sorted(test_flow.frame_types.items()):
outlier_count = len(ft_stats.outlier_frames)
total_outliers += outlier_count
if outlier_count > 0:
print(f"{frame_type}: {outlier_count} outliers")
print(f"Total outliers: {total_outliers}")
bg_analyzer.cleanup()
if __name__ == "__main__":
if len(sys.argv) > 1:
test_fixed_frame_references(sys.argv[1])
else:
test_fixed_frame_references()