102 lines
4.4 KiB
Python
102 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
||
"""Test script for enhanced outlier tracking"""
|
||
|
||
import sys
|
||
sys.path.append('.')
|
||
|
||
from analyzer.analysis import EthernetAnalyzer
|
||
from analyzer.utils import PCAPLoader
|
||
|
||
def test_enhanced_outlier_tracking(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
|
||
"""Test enhanced outlier tracking functionality"""
|
||
|
||
print("=== Testing Enhanced Outlier Tracking ===")
|
||
|
||
# Initialize analyzer
|
||
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
|
||
|
||
# Load and process packets
|
||
loader = PCAPLoader(pcap_file)
|
||
packets = loader.load_all()
|
||
|
||
print(f"Loaded {len(packets)} packets")
|
||
|
||
# Process packets
|
||
for i, packet in enumerate(packets, 1):
|
||
analyzer._process_single_packet(packet, i)
|
||
|
||
# Calculate statistics to populate outlier data
|
||
analyzer.calculate_statistics()
|
||
|
||
# Find the test flow
|
||
test_flow = None
|
||
for flow_key, flow in analyzer.flows.items():
|
||
if flow.src_ip == src_ip:
|
||
test_flow = flow
|
||
break
|
||
|
||
if not test_flow:
|
||
print(f"❌ No flow found from {src_ip}")
|
||
return
|
||
|
||
print(f"\n✅ Found flow: {test_flow.src_ip}:{test_flow.src_port} → {test_flow.dst_ip}:{test_flow.dst_port}")
|
||
print(f" Total packets: {test_flow.frame_count}")
|
||
|
||
# Test frame type outlier tracking
|
||
print(f"\n=== Frame Type Analysis ===")
|
||
total_frame_type_outliers = 0
|
||
for frame_type, ft_stats in test_flow.frame_types.items():
|
||
outlier_count = len(ft_stats.outlier_frames)
|
||
total_frame_type_outliers += outlier_count
|
||
|
||
if outlier_count > 0:
|
||
print(f"\n{frame_type}: {outlier_count} outliers")
|
||
print(f" Avg ΔT: {ft_stats.avg_inter_arrival * 1000:.3f} ms")
|
||
print(f" Std σ: {ft_stats.std_inter_arrival * 1000:.3f} ms")
|
||
print(f" Threshold: {(ft_stats.avg_inter_arrival + 3 * ft_stats.std_inter_arrival) * 1000:.3f} ms")
|
||
|
||
# Test enhanced outlier details
|
||
if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details:
|
||
print(" ✅ Enhanced outlier details available:")
|
||
for i, (frame_num, prev_frame_num, delta_t) in enumerate(ft_stats.enhanced_outlier_details[:3]):
|
||
deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
|
||
print(f" Frame {frame_num} (from {prev_frame_num}): {delta_t * 1000:.3f} ms ({deviation:.1f}σ)")
|
||
if len(ft_stats.enhanced_outlier_details) > 3:
|
||
print(f" ... and {len(ft_stats.enhanced_outlier_details) - 3} more")
|
||
elif ft_stats.outlier_details:
|
||
print(" ⚠️ Legacy outlier details only:")
|
||
for i, (frame_num, delta_t) in enumerate(ft_stats.outlier_details[:3]):
|
||
deviation = (delta_t - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
|
||
print(f" Frame {frame_num}: {delta_t * 1000:.3f} ms ({deviation:.1f}σ)")
|
||
if len(ft_stats.outlier_details) > 3:
|
||
print(f" ... and {len(ft_stats.outlier_details) - 3} more")
|
||
else:
|
||
print(" ❌ No outlier details found")
|
||
|
||
print(f"\n=== Summary ===")
|
||
print(f"Total frame-type outliers: {total_frame_type_outliers}")
|
||
|
||
# Check if CH10-Data specifically has outliers
|
||
ch10_data_stats = test_flow.frame_types.get('CH10-Data')
|
||
if ch10_data_stats:
|
||
ch10_outliers = len(ch10_data_stats.outlier_frames)
|
||
print(f"CH10-Data outliers: {ch10_outliers}")
|
||
|
||
if hasattr(ch10_data_stats, 'enhanced_outlier_details'):
|
||
enhanced_count = len(ch10_data_stats.enhanced_outlier_details)
|
||
print(f"CH10-Data enhanced details: {enhanced_count}")
|
||
|
||
if enhanced_count > 0:
|
||
print("✅ Enhanced outlier tracking is working correctly!")
|
||
else:
|
||
print("⚠️ Enhanced outlier tracking not populated")
|
||
else:
|
||
print("❌ Enhanced outlier details attribute missing")
|
||
else:
|
||
print("❌ No CH10-Data frame type found")
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) > 1:
|
||
test_enhanced_outlier_tracking(sys.argv[1])
|
||
else:
|
||
test_enhanced_outlier_tracking() |