142 lines
6.6 KiB
Python
142 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
||
"""Test script for enhanced outlier reporting"""
|
||
|
||
import sys
|
||
sys.path.append('.')
|
||
|
||
from analyzer.analysis import EthernetAnalyzer
|
||
from analyzer.utils import PCAPLoader
|
||
|
||
def test_enhanced_report(pcap_file="1 PTPGM.pcapng", threshold_sigma=3.0):
|
||
"""Test enhanced outlier reporting functionality"""
|
||
|
||
print("=== Testing Enhanced Outlier Report ===")
|
||
|
||
# Initialize analyzer
|
||
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=threshold_sigma)
|
||
|
||
# Load and process packets
|
||
loader = PCAPLoader(pcap_file)
|
||
packets = loader.load_all()
|
||
|
||
print(f"Loaded {len(packets)} packets")
|
||
|
||
# Process packets
|
||
for i, packet in enumerate(packets, 1):
|
||
analyzer._process_single_packet(packet, i)
|
||
|
||
# Calculate statistics
|
||
analyzer.calculate_statistics()
|
||
|
||
# Generate the enhanced report (copied from main.py)
|
||
summary = analyzer.get_summary()
|
||
|
||
print("=" * 80)
|
||
print("COMPREHENSIVE OUTLIER ANALYSIS REPORT")
|
||
print("=" * 80)
|
||
|
||
# Analysis parameters
|
||
print(f"Outlier Detection Threshold: {threshold_sigma}σ (sigma)")
|
||
print(f"Total Packets Analyzed: {summary['total_packets']:,}")
|
||
print(f"Unique IP Flows: {summary['unique_flows']}")
|
||
print(f"Unique IP Addresses: {summary['unique_ips']}")
|
||
|
||
# Overall statistics
|
||
stats = analyzer.get_summary_statistics()
|
||
if stats:
|
||
print(f"\nOVERALL TIMING STATISTICS:")
|
||
print(f" Average Inter-arrival Time: {stats.get('overall_avg_inter_arrival', 0):.6f}s")
|
||
print(f" Standard Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s")
|
||
print(f" Total Outlier Frames: {stats.get('total_outliers', 0)}")
|
||
print(f" Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%")
|
||
|
||
print("\n" + "=" * 80)
|
||
print("DETAILED FLOW ANALYSIS")
|
||
print("=" * 80)
|
||
|
||
flows_sorted = sorted(summary['flows'].values(), key=lambda x: (
|
||
analyzer.statistics_engine.get_max_sigma_deviation(x),
|
||
x.frame_count
|
||
), reverse=True)
|
||
|
||
for flow_idx, flow in enumerate(flows_sorted[:2], 1): # Show first 2 flows
|
||
max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow)
|
||
print(f"\n[FLOW {flow_idx}] {flow.src_ip} -> {flow.dst_ip}")
|
||
print("-" * 60)
|
||
|
||
# Flow summary
|
||
print(f"Total Packets: {flow.frame_count:,}")
|
||
print(f"Total Bytes: {flow.total_bytes:,}")
|
||
print(f"Max Sigma Deviation: {max_sigma:.2f}σ")
|
||
print(f"Protocols: {', '.join(flow.protocols)}")
|
||
if flow.detected_protocol_types:
|
||
print(f"Enhanced Protocols: {', '.join(flow.detected_protocol_types)}")
|
||
|
||
# Frame type analysis
|
||
if flow.frame_types:
|
||
print(f"\nFrame Type Breakdown:")
|
||
print(f" {'Type':<15} {'Count':<8} {'Avg ΔT':<12} {'Std Dev':<12} {'Out':<6} {'Out %':<8}")
|
||
print(f" {'-' * 15} {'-' * 8} {'-' * 12} {'-' * 12} {'-' * 6} {'-' * 8}")
|
||
|
||
sorted_frame_types = sorted(flow.frame_types.items(),
|
||
key=lambda x: x[1].count, reverse=True)
|
||
|
||
for frame_type, ft_stats in sorted_frame_types:
|
||
outlier_count = len(ft_stats.outlier_details)
|
||
outlier_pct = (outlier_count / ft_stats.count * 100) if ft_stats.count > 0 else 0
|
||
|
||
avg_str = f"{ft_stats.avg_inter_arrival:.6f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
|
||
std_str = f"{ft_stats.std_inter_arrival:.6f}s" if ft_stats.std_inter_arrival > 0 else "N/A"
|
||
|
||
print(f" {frame_type:<15} {ft_stats.count:<8} {avg_str:<12} {std_str:<12} {outlier_count:<6} {outlier_pct:<7.1f}%")
|
||
|
||
# Detailed outlier frames
|
||
has_outliers = any(ft_stats.outlier_details for ft_stats in flow.frame_types.values())
|
||
|
||
if has_outliers:
|
||
print(f"\nOutlier Frame Details:")
|
||
for frame_type, ft_stats in flow.frame_types.items():
|
||
if ft_stats.outlier_details:
|
||
print(f"\n {frame_type} Outliers ({len(ft_stats.outlier_details)} frames):")
|
||
if ft_stats.avg_inter_arrival > 0:
|
||
threshold = ft_stats.avg_inter_arrival + (threshold_sigma * ft_stats.std_inter_arrival)
|
||
print(f" Threshold: {threshold:.6f}s (>{threshold_sigma}σ from mean {ft_stats.avg_inter_arrival:.6f}s)")
|
||
|
||
# Use enhanced outlier details if available
|
||
if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details:
|
||
print(f" {'Frame#':<10} {'From Frame':<10} {'Inter-arrival':<15} {'Deviation':<12}")
|
||
print(f" {'-' * 10} {'-' * 10} {'-' * 15} {'-' * 12}")
|
||
|
||
for frame_num, prev_frame_num, inter_arrival_time in ft_stats.enhanced_outlier_details:
|
||
if ft_stats.avg_inter_arrival > 0:
|
||
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
|
||
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
|
||
dev_str = f"+{sigma_dev:.1f}σ"
|
||
else:
|
||
dev_str = "N/A"
|
||
|
||
print(f" {frame_num:<10} {prev_frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
|
||
else:
|
||
# Fallback to legacy outlier details
|
||
print(f" {'Frame#':<10} {'Inter-arrival':<15} {'Deviation':<12}")
|
||
print(f" {'-' * 10} {'-' * 15} {'-' * 12}")
|
||
|
||
for frame_num, inter_arrival_time in ft_stats.outlier_details:
|
||
if ft_stats.avg_inter_arrival > 0:
|
||
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
|
||
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
|
||
dev_str = f"+{sigma_dev:.1f}σ"
|
||
else:
|
||
dev_str = "N/A"
|
||
|
||
print(f" {frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
|
||
|
||
print(f"\n" + "=" * 80)
|
||
print("REPORT COMPLETE")
|
||
print("=" * 80)
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) > 1:
|
||
test_enhanced_report(sys.argv[1])
|
||
else:
|
||
test_enhanced_report() |