115 lines
4.8 KiB
Python
115 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
||
"""Test that frame reference issues are now fixed"""
|
||
|
||
import sys
|
||
sys.path.append('.')
|
||
|
||
from analyzer.analysis import EthernetAnalyzer
|
||
from analyzer.analysis.background_analyzer import BackgroundAnalyzer
|
||
import time
|
||
|
||
def test_fixed_frame_references(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
|
||
"""Test that frame reference issues are now fixed"""
|
||
|
||
print("=== Testing Fixed Frame References ===")
|
||
|
||
# Use single-threaded background analyzer (like TUI now does)
|
||
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
|
||
bg_analyzer = BackgroundAnalyzer(analyzer, num_threads=1)
|
||
|
||
bg_analyzer.start_parsing(pcap_file)
|
||
while bg_analyzer.is_parsing:
|
||
time.sleep(0.1)
|
||
|
||
# Find test flow
|
||
test_flow = None
|
||
for flow_key, flow in analyzer.flows.items():
|
||
if flow.src_ip == src_ip:
|
||
test_flow = flow
|
||
break
|
||
|
||
if not test_flow:
|
||
print(f"❌ No flow found from {src_ip}")
|
||
bg_analyzer.cleanup()
|
||
return
|
||
|
||
print(f"✅ Found flow: {test_flow.src_ip}:{test_flow.src_port} → {test_flow.dst_ip}:{test_flow.dst_port}")
|
||
|
||
# Check CH10-Data outliers
|
||
ch10_data_stats = test_flow.frame_types.get('CH10-Data')
|
||
if not ch10_data_stats:
|
||
print("❌ No CH10-Data frame type found")
|
||
bg_analyzer.cleanup()
|
||
return
|
||
|
||
print(f"\nCH10-Data: {len(ch10_data_stats.frame_numbers)} frames")
|
||
print(f"CH10-Data outliers: {len(ch10_data_stats.outlier_frames)}")
|
||
|
||
if hasattr(ch10_data_stats, 'enhanced_outlier_details') and ch10_data_stats.enhanced_outlier_details:
|
||
print(f"\n=== Enhanced Outlier Details ===")
|
||
|
||
all_correct = True
|
||
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
|
||
# Verify frame reference is correct
|
||
if frame_num in ch10_data_stats.frame_numbers:
|
||
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
|
||
if frame_index > 0:
|
||
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
|
||
status = "✅ CORRECT" if prev_frame_num == expected_prev else f"❌ WRONG (expected {expected_prev})"
|
||
|
||
if prev_frame_num != expected_prev:
|
||
all_correct = False
|
||
|
||
deviation = (delta_t - ch10_data_stats.avg_inter_arrival) / ch10_data_stats.std_inter_arrival if ch10_data_stats.std_inter_arrival > 0 else 0
|
||
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms ({deviation:.1f}σ) - {status}")
|
||
else:
|
||
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.1f}ms - First frame")
|
||
|
||
if all_correct:
|
||
print(f"\n🎉 ALL FRAME REFERENCES ARE CORRECT!")
|
||
else:
|
||
print(f"\n⚠️ Some frame references still incorrect")
|
||
|
||
# Check specific frames that were problematic before
|
||
problem_frames = [486, 957]
|
||
print(f"\n=== Checking Previously Problematic Frames ===")
|
||
|
||
for target_frame in problem_frames:
|
||
found = False
|
||
for frame_num, prev_frame_num, delta_t in ch10_data_stats.enhanced_outlier_details:
|
||
if frame_num == target_frame:
|
||
if frame_num in ch10_data_stats.frame_numbers:
|
||
frame_index = ch10_data_stats.frame_numbers.index(frame_num)
|
||
if frame_index > 0:
|
||
expected_prev = ch10_data_stats.frame_numbers[frame_index - 1]
|
||
if prev_frame_num == expected_prev:
|
||
print(f" ✅ Frame {target_frame}: FIXED! Now correctly shows previous frame {prev_frame_num}")
|
||
else:
|
||
print(f" ❌ Frame {target_frame}: Still wrong - shows {prev_frame_num}, expected {expected_prev}")
|
||
found = True
|
||
break
|
||
|
||
if not found:
|
||
print(f" ℹ️ Frame {target_frame}: Not an outlier (timing is normal)")
|
||
|
||
else:
|
||
print("❌ No enhanced outlier details found")
|
||
|
||
# Show frame type summary
|
||
print(f"\n=== Frame Type Summary ===")
|
||
total_outliers = 0
|
||
for frame_type, ft_stats in sorted(test_flow.frame_types.items()):
|
||
outlier_count = len(ft_stats.outlier_frames)
|
||
total_outliers += outlier_count
|
||
if outlier_count > 0:
|
||
print(f"{frame_type}: {outlier_count} outliers")
|
||
|
||
print(f"Total outliers: {total_outliers}")
|
||
|
||
bg_analyzer.cleanup()
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) > 1:
|
||
test_fixed_frame_references(sys.argv[1])
|
||
else:
|
||
test_fixed_frame_references() |