Files
StreamLens/verify_frame_outliers.py

77 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""Verify frame-type-specific outlier counts"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.utils import PCAPLoader
def verify_outliers(pcap_file, src_ip="192.168.4.89"):
"""Verify the new frame-type-specific outlier counts"""
# Create analyzer
analyzer = EthernetAnalyzer(outlier_threshold_sigma=3.0)
# Load PCAP
loader = PCAPLoader(pcap_file)
packets = loader.load_all()
# Process packets
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
# Calculate statistics
analyzer.calculate_statistics()
# Find the specific flow
target_flow = None
for flow_key, flow in analyzer.flows.items():
if flow.src_ip == src_ip:
target_flow = flow
break
if not target_flow:
print(f"Flow from {src_ip} not found!")
return
print(f"=== FRAME-TYPE-SPECIFIC OUTLIER VERIFICATION ===")
print(f"Flow: {target_flow.src_ip}:{target_flow.src_port} -> {target_flow.dst_ip}:{target_flow.dst_port}")
# Calculate what the UI should show
total_frame_type_outliers = 0
print(f"\nFrame Type Outlier Breakdown:")
for frame_type, ft_stats in sorted(target_flow.frame_types.items(), key=lambda x: len(x[1].outlier_frames), reverse=True):
outlier_count = len(ft_stats.outlier_frames)
total_frame_type_outliers += outlier_count
if outlier_count > 0:
print(f" {frame_type}: {outlier_count} outliers")
print(f" Frames: {sorted(ft_stats.outlier_frames)}")
else:
print(f" {frame_type}: {outlier_count} outliers")
print(f"\n=== UI DISPLAY VALUES ===")
print(f"Main flow row 'Out' column should show: {total_frame_type_outliers}")
print(f"CH10-Data subrow 'Out' column should show: {len(target_flow.frame_types.get('CH10-Data', type('', (), {'outlier_frames': []})).outlier_frames)}")
# Verify the specific count you mentioned
ch10_data_outliers = len(target_flow.frame_types.get('CH10-Data', type('', (), {'outlier_frames': []})).outlier_frames)
if ch10_data_outliers == 20:
print(f"\n✅ CONFIRMED: CH10-Data shows {ch10_data_outliers} outliers!")
else:
print(f"\n⚠️ CH10-Data shows {ch10_data_outliers} outliers (you reported seeing 20)")
# Show the old vs new comparison
flow_level_outliers = len(target_flow.outlier_frames)
print(f"\n=== COMPARISON ===")
print(f"Old method (flow-level): {flow_level_outliers} outliers")
print(f"New method (frame-type): {total_frame_type_outliers} outliers")
print(f"Improvement: Now showing {total_frame_type_outliers - flow_level_outliers} more relevant outliers!")
if __name__ == "__main__":
if len(sys.argv) > 1:
verify_outliers(sys.argv[1])
else:
verify_outliers("1 PTPGM.pcapng")