Files
StreamLens/test_enhanced_outliers.py

102 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Test script for enhanced outlier tracking"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.utils import PCAPLoader
def test_enhanced_outlier_tracking(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
"""Test enhanced outlier tracking functionality"""
print("=== Testing Enhanced Outlier Tracking ===")
# Initialize analyzer
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
# Load and process packets
loader = PCAPLoader(pcap_file)
packets = loader.load_all()
print(f"Loaded {len(packets)} packets")
# Process packets
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
# Calculate statistics to populate outlier data
analyzer.calculate_statistics()
# Find the test flow
test_flow = None
for flow_key, flow in analyzer.flows.items():
if flow.src_ip == src_ip:
test_flow = flow
break
if not test_flow:
print(f"❌ No flow found from {src_ip}")
return
print(f"\n✅ Found flow: {test_flow.src_ip}:{test_flow.src_port}{test_flow.dst_ip}:{test_flow.dst_port}")
print(f" Total packets: {test_flow.frame_count}")
# Test frame type outlier tracking
print(f"\n=== Frame Type Analysis ===")
total_frame_type_outliers = 0
for frame_type, ft_stats in test_flow.frame_types.items():
outlier_count = len(ft_stats.outlier_frames)
total_frame_type_outliers += outlier_count
if outlier_count > 0:
print(f"\n{frame_type}: {outlier_count} outliers")
print(f" Avg ΔT: {ft_stats.avg_inter_arrival * 1000:.3f} ms")
print(f" Std σ: {ft_stats.std_inter_arrival * 1000:.3f} ms")
print(f" Threshold: {(ft_stats.avg_inter_arrival + 3 * ft_stats.std_inter_arrival) * 1000:.3f} ms")
# Test enhanced outlier details
if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details:
print(" ✅ Enhanced outlier details available:")
for i, (frame_num, prev_frame_num, delta_t) in enumerate(ft_stats.enhanced_outlier_details[:3]):
deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms ({deviation:.1f}σ)")
if len(ft_stats.enhanced_outlier_details) > 3:
print(f" ... and {len(ft_stats.enhanced_outlier_details) - 3} more")
elif ft_stats.outlier_details:
print(" ⚠️ Legacy outlier details only:")
for i, (frame_num, delta_t) in enumerate(ft_stats.outlier_details[:3]):
deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
print(f" Frame {frame_num}: {delta_t * 1000:.3f} ms ({deviation:.1f}σ)")
if len(ft_stats.outlier_details) > 3:
print(f" ... and {len(ft_stats.outlier_details) - 3} more")
else:
print(" ❌ No outlier details found")
print(f"\n=== Summary ===")
print(f"Total frame-type outliers: {total_frame_type_outliers}")
# Check if CH10-Data specifically has outliers
ch10_data_stats = test_flow.frame_types.get('CH10-Data')
if ch10_data_stats:
ch10_outliers = len(ch10_data_stats.outlier_frames)
print(f"CH10-Data outliers: {ch10_outliers}")
if hasattr(ch10_data_stats, 'enhanced_outlier_details'):
enhanced_count = len(ch10_data_stats.enhanced_outlier_details)
print(f"CH10-Data enhanced details: {enhanced_count}")
if enhanced_count > 0:
print("✅ Enhanced outlier tracking is working correctly!")
else:
print("⚠️ Enhanced outlier tracking not populated")
else:
print("❌ Enhanced outlier details attribute missing")
else:
print("❌ No CH10-Data frame type found")
if __name__ == "__main__":
if len(sys.argv) > 1:
test_enhanced_outlier_tracking(sys.argv[1])
else:
test_enhanced_outlier_tracking()