#!/usr/bin/env python3 """Test script for enhanced outlier reporting""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.utils import PCAPLoader def test_enhanced_report(pcap_file="1 PTPGM.pcapng", threshold_sigma=3.0): """Test enhanced outlier reporting functionality""" print("=== Testing Enhanced Outlier Report ===") # Initialize analyzer analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=threshold_sigma) # Load and process packets loader = PCAPLoader(pcap_file) packets = loader.load_all() print(f"Loaded {len(packets)} packets") # Process packets for i, packet in enumerate(packets, 1): analyzer._process_single_packet(packet, i) # Calculate statistics analyzer.calculate_statistics() # Generate the enhanced report (copied from main.py) summary = analyzer.get_summary() print("=" * 80) print("COMPREHENSIVE OUTLIER ANALYSIS REPORT") print("=" * 80) # Analysis parameters print(f"Outlier Detection Threshold: {threshold_sigma}σ (sigma)") print(f"Total Packets Analyzed: {summary['total_packets']:,}") print(f"Unique IP Flows: {summary['unique_flows']}") print(f"Unique IP Addresses: {summary['unique_ips']}") # Overall statistics stats = analyzer.get_summary_statistics() if stats: print(f"\nOVERALL TIMING STATISTICS:") print(f" Average Inter-arrival Time: {stats.get('overall_avg_inter_arrival', 0):.6f}s") print(f" Standard Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s") print(f" Total Outlier Frames: {stats.get('total_outliers', 0)}") print(f" Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%") print("\n" + "=" * 80) print("DETAILED FLOW ANALYSIS") print("=" * 80) flows_sorted = sorted(summary['flows'].values(), key=lambda x: ( analyzer.statistics_engine.get_max_sigma_deviation(x), x.frame_count ), reverse=True) for flow_idx, flow in enumerate(flows_sorted[:2], 1): # Show first 2 flows max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow) print(f"\n[FLOW {flow_idx}] {flow.src_ip} -> {flow.dst_ip}") print("-" * 60) # Flow summary print(f"Total Packets: {flow.frame_count:,}") print(f"Total Bytes: {flow.total_bytes:,}") print(f"Max Sigma Deviation: {max_sigma:.2f}σ") print(f"Protocols: {', '.join(flow.protocols)}") if flow.detected_protocol_types: print(f"Enhanced Protocols: {', '.join(flow.detected_protocol_types)}") # Frame type analysis if flow.frame_types: print(f"\nFrame Type Breakdown:") print(f" {'Type':<15} {'Count':<8} {'Avg ΔT':<12} {'Std Dev':<12} {'Out':<6} {'Out %':<8}") print(f" {'-' * 15} {'-' * 8} {'-' * 12} {'-' * 12} {'-' * 6} {'-' * 8}") sorted_frame_types = sorted(flow.frame_types.items(), key=lambda x: x[1].count, reverse=True) for frame_type, ft_stats in sorted_frame_types: outlier_count = len(ft_stats.outlier_details) outlier_pct = (outlier_count / ft_stats.count * 100) if ft_stats.count > 0 else 0 avg_str = f"{ft_stats.avg_inter_arrival:.6f}s" if ft_stats.avg_inter_arrival > 0 else "N/A" std_str = f"{ft_stats.std_inter_arrival:.6f}s" if ft_stats.std_inter_arrival > 0 else "N/A" print(f" {frame_type:<15} {ft_stats.count:<8} {avg_str:<12} {std_str:<12} {outlier_count:<6} {outlier_pct:<7.1f}%") # Detailed outlier frames has_outliers = any(ft_stats.outlier_details for ft_stats in flow.frame_types.values()) if has_outliers: print(f"\nOutlier Frame Details:") for frame_type, ft_stats in flow.frame_types.items(): if ft_stats.outlier_details: print(f"\n {frame_type} Outliers ({len(ft_stats.outlier_details)} frames):") if ft_stats.avg_inter_arrival > 0: threshold = ft_stats.avg_inter_arrival + (threshold_sigma * ft_stats.std_inter_arrival) print(f" Threshold: {threshold:.6f}s (>{threshold_sigma}σ from mean {ft_stats.avg_inter_arrival:.6f}s)") # Use enhanced outlier details if available if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details: print(f" {'Frame#':<10} {'From Frame':<10} {'Inter-arrival':<15} {'Deviation':<12}") print(f" {'-' * 10} {'-' * 10} {'-' * 15} {'-' * 12}") for frame_num, prev_frame_num, inter_arrival_time in ft_stats.enhanced_outlier_details: if ft_stats.avg_inter_arrival > 0: deviation = inter_arrival_time - ft_stats.avg_inter_arrival sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0 dev_str = f"+{sigma_dev:.1f}σ" else: dev_str = "N/A" print(f" {frame_num:<10} {prev_frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}") else: # Fallback to legacy outlier details print(f" {'Frame#':<10} {'Inter-arrival':<15} {'Deviation':<12}") print(f" {'-' * 10} {'-' * 15} {'-' * 12}") for frame_num, inter_arrival_time in ft_stats.outlier_details: if ft_stats.avg_inter_arrival > 0: deviation = inter_arrival_time - ft_stats.avg_inter_arrival sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0 dev_str = f"+{sigma_dev:.1f}σ" else: dev_str = "N/A" print(f" {frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}") print(f"\n" + "=" * 80) print("REPORT COMPLETE") print("=" * 80) if __name__ == "__main__": if len(sys.argv) > 1: test_enhanced_report(sys.argv[1]) else: test_enhanced_report()