Files
StreamLens/test_fixed_frame_references.py

115 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Test that frame reference issues are now fixed"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.analysis.background_analyzer import BackgroundAnalyzer
import time
def test_fixed_frame_references(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
"""Test that frame reference issues are now fixed"""
print("=== Testing Fixed Frame References ===")
# Use single-threaded background analyzer (like TUI now does)
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
bg_analyzer = BackgroundAnalyzer(analyzer, num_threads=1)
bg_analyzer.start_parsing(pcap_file)
while bg_analyzer.is_parsing:
time.sleep(0.1)
# Find test flow
test_flow = None
for flow_key, flow in analyzer.flows.items():
if flow.src_ip == src_ip:
test_flow = flow
break
if not test_flow:
print(f"❌ No flow found from {src_ip}")
bg_analyzer.cleanup()
return
print(f"✅ Found flow: {test_flow.src_ip}:{test_flow.src_port}{test_flow.dst_ip}:{test_flow.dst_port}")
# Check CH10-Data outliers
ch10_data_stats = test_flow.frame_types.get('CH10-Data')
if not ch10_data_stats:
print("❌ No CH10-Data frame type found")
bg_analyzer.cleanup()
return
print(f"\nCH10-Data: {len(ch10_data_stats.frame_numbers)} frames")
print(f"CH10-Data outliers: {len(ch10_data_stats.outlier_frames)}")
if hasattr(ch10_data_stats, 'enhanced_outlier_details') and ch10_data_stats.enhanced_outlier_details:
print(f"\n=== Enhanced Outlier Details ===")
all_correct = True
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
# Verify frame reference is correct
if frame_num in ch10_data_stats.frame_numbers:
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
if frame_index > 0:
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
status = "✅ CORRECT" if prev_frame_num == expected_prev else f"❌ WRONG (expected {expected_prev})"
if prev_frame_num != expected_prev:
all_correct = False
deviation = (delta_t - ch10_data_stats.avg_inter_arrival) / ch10_data_stats.std_inter_arrival if ch10_data_stats.std_inter_arrival > 0 else 0
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms ({deviation:.1f}σ) - {status}")
else:
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms - First frame")
if all_correct:
print(f"\n🎉 ALL FRAME REFERENCES ARE CORRECT!")
else:
print(f"\n⚠️ Some frame references still incorrect")
# Check specific frames that were problematic before
problem_frames = [486, 957]
print(f"\n=== Checking Previously Problematic Frames ===")
for target_frame in problem_frames:
found = False
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
if frame_num == target_frame:
if frame_num in ch10_data_stats.frame_numbers:
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
if frame_index > 0:
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
if prev_frame_num == expected_prev:
print(f" ✅ Frame {target_frame}: FIXED! Now correctly shows previous frame {prev_frame_num}")
else:
print(f" ❌ Frame {target_frame}: Still wrong - shows {prev_frame_num}, expected {expected_prev}")
found = True
break
if not found:
print(f" Frame {target_frame}: Not an outlier (timing is normal)")
else:
print("❌ No enhanced outlier details found")
# Show frame type summary
print(f"\n=== Frame Type Summary ===")
total_outliers = 0
for frame_type, ft_stats in sorted(test_flow.frame_types.items()):
outlier_count = len(ft_stats.outlier_frames)
total_outliers += outlier_count
if outlier_count > 0:
print(f"{frame_type}: {outlier_count} outliers")
print(f"Total outliers: {total_outliers}")
bg_analyzer.cleanup()
if __name__ == "__main__":
if len(sys.argv) > 1:
test_fixed_frame_references(sys.argv[1])
else:
test_fixed_frame_references()