#!/usr/bin/env python3 """Debug frame type timing isolation""" import sys sys.path.append('.') from analyzer.analysis import EthernetAnalyzer from analyzer.utils import PCAPLoader def debug_frame_timing_isolation(pcap_file, src_ip="192.168.4.89"): """Debug timing calculations per frame type""" # Create analyzer analyzer = EthernetAnalyzer(outlier_threshold_sigma=3.0) # Load PCAP loader = PCAPLoader(pcap_file) packets = loader.load_all() print(f"Loaded {len(packets)} packets") # Process packets for i, packet in enumerate(packets, 1): analyzer._process_single_packet(packet, i) # Calculate statistics analyzer.calculate_statistics() # Find the specific flow target_flow = None for flow_key, flow in analyzer.flows.items(): if flow.src_ip == src_ip: target_flow = flow break if not target_flow: print(f"Flow from {src_ip} not found!") return print(f"\n=== FLOW: {target_flow.src_ip}:{target_flow.src_port} -> {target_flow.dst_ip}:{target_flow.dst_port} ===") print(f"Total packets: {target_flow.frame_count}") print(f"Flow-level outliers: {len(target_flow.outlier_frames)} {sorted(target_flow.outlier_frames)}") print(f"Flow avg ΔT: {target_flow.avg_inter_arrival * 1000:.3f} ms") print(f"Flow std σ: {target_flow.std_inter_arrival * 1000:.3f} ms") print(f"\n=== FRAME TYPE TIMING ISOLATION ===") # Show each frame type's timing in detail for frame_type, stats in sorted(target_flow.frame_types.items(), key=lambda x: x[1].count, reverse=True): print(f"\n--- {frame_type} ---") print(f"Packets: {stats.count}") print(f"Frame numbers: {stats.frame_numbers[:10]}{'...' if len(stats.frame_numbers) > 10 else ''}") print(f"Inter-arrival times count: {len(stats.inter_arrival_times)}") if len(stats.inter_arrival_times) >= 2: print(f"Avg ΔT: {stats.avg_inter_arrival * 1000:.3f} ms") print(f"Std σ: {stats.std_inter_arrival * 1000:.3f} ms") print(f"3σ threshold: {(stats.avg_inter_arrival + 3 * stats.std_inter_arrival) * 1000:.3f} ms") print(f"Outliers: {len(stats.outlier_frames)} {sorted(stats.outlier_frames)}") # Show first 10 inter-arrival times for this frame type print("First 10 inter-arrival times:") for i, t in enumerate(stats.inter_arrival_times[:10]): frame_num = stats.frame_numbers[i+1] if i+1 < len(stats.frame_numbers) else "?" is_outlier = frame_num in stats.outlier_frames if isinstance(frame_num, int) else False outlier_mark = " *OUTLIER*" if is_outlier else "" print(f" Frame {frame_num}: {t * 1000:.3f} ms{outlier_mark}") # Show timing around known problematic frames problematic_frames = [1576, 1582, 1634, 1640] for prob_frame in problematic_frames: if prob_frame in stats.frame_numbers: idx = stats.frame_numbers.index(prob_frame) if idx > 0 and idx-1 < len(stats.inter_arrival_times): inter_time = stats.inter_arrival_times[idx-1] deviation = (inter_time - stats.avg_inter_arrival) / stats.std_inter_arrival if stats.std_inter_arrival > 0 else 0 print(f" >> Frame {prob_frame}: {inter_time * 1000:.3f} ms ({deviation:.1f}σ)") else: print("Not enough data for timing analysis") print(f"\n=== CROSS-CONTAMINATION CHECK ===") # Check if timing from one frame type is affecting another # Look for cases where inter-arrival times might be calculated across frame types print("Checking for inter-frame-type timing calculations...") # Get all frame timestamps in order all_frame_data = [] for frame_type, stats in target_flow.frame_types.items(): for i, (frame_num, timestamp) in enumerate(zip(stats.frame_numbers, stats.timestamps)): all_frame_data.append((frame_num, timestamp, frame_type)) # Sort by frame number all_frame_data.sort(key=lambda x: x[0]) print(f"\nFirst 20 frames in order:") for i, (frame_num, timestamp, frame_type) in enumerate(all_frame_data[:20]): if i > 0: prev_timestamp = all_frame_data[i-1][1] prev_frame_type = all_frame_data[i-1][2] inter_time = timestamp - prev_timestamp cross_type = frame_type != prev_frame_type print(f" Frame {frame_num} ({frame_type}): ΔT={inter_time * 1000:.3f} ms {'[CROSS-TYPE]' if cross_type else ''}") print(f"\n=== PROBLEMATIC FRAMES ANALYSIS ===") # Check each problematic frame to see which frame type it belongs to problematic_frames = [1576, 1582, 1634, 1640] for prob_frame in problematic_frames: frame_type_found = None for frame_type, stats in target_flow.frame_types.items(): if prob_frame in stats.frame_numbers: frame_type_found = frame_type break print(f"Frame {prob_frame}: belongs to {frame_type_found}") # Check if this frame is an outlier in its frame type if frame_type_found: stats = target_flow.frame_types[frame_type_found] is_outlier = prob_frame in stats.outlier_frames print(f" -> Is outlier in {frame_type_found}: {is_outlier}") if __name__ == "__main__": if len(sys.argv) > 1: debug_frame_timing_isolation(sys.argv[1]) else: debug_frame_timing_isolation("1 PTPGM.pcapng")