Files
StreamLens/debug_frame_timing_isolation.py

130 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Debug frame type timing isolation"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.utils import PCAPLoader
def debug_frame_timing_isolation(pcap_file, src_ip="192.168.4.89"):
"""Debug timing calculations per frame type"""
# Create analyzer
analyzer = EthernetAnalyzer(outlier_threshold_sigma=3.0)
# Load PCAP
loader = PCAPLoader(pcap_file)
packets = loader.load_all()
print(f"Loaded {len(packets)} packets")
# Process packets
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
# Calculate statistics
analyzer.calculate_statistics()
# Find the specific flow
target_flow = None
for flow_key, flow in analyzer.flows.items():
if flow.src_ip == src_ip:
target_flow = flow
break
if not target_flow:
print(f"Flow from {src_ip} not found!")
return
print(f"\n=== FLOW: {target_flow.src_ip}:{target_flow.src_port} -> {target_flow.dst_ip}:{target_flow.dst_port} ===")
print(f"Total packets: {target_flow.frame_count}")
print(f"Flow-level outliers: {len(target_flow.outlier_frames)} {sorted(target_flow.outlier_frames)}")
print(f"Flow avg ΔT: {target_flow.avg_inter_arrival * 1000:.3f} ms")
print(f"Flow std σ: {target_flow.std_inter_arrival * 1000:.3f} ms")
print(f"\n=== FRAME TYPE TIMING ISOLATION ===")
# Show each frame type's timing in detail
for frame_type, stats in sorted(target_flow.frame_types.items(), key=lambda x: x[1].count, reverse=True):
print(f"\n--- {frame_type} ---")
print(f"Packets: {stats.count}")
print(f"Frame numbers: {stats.frame_numbers[:10]}{'...' if len(stats.frame_numbers) > 10 else ''}")
print(f"Inter-arrival times count: {len(stats.inter_arrival_times)}")
if len(stats.inter_arrival_times) >= 2:
print(f"Avg ΔT: {stats.avg_inter_arrival * 1000:.3f} ms")
print(f"Std σ: {stats.std_inter_arrival * 1000:.3f} ms")
print(f"3σ threshold: {(stats.avg_inter_arrival + 3 * stats.std_inter_arrival) * 1000:.3f} ms")
print(f"Outliers: {len(stats.outlier_frames)} {sorted(stats.outlier_frames)}")
# Show first 10 inter-arrival times for this frame type
print("First 10 inter-arrival times:")
for i, t in enumerate(stats.inter_arrival_times[:10]):
frame_num = stats.frame_numbers[i+1] if i+1 < len(stats.frame_numbers) else "?"
is_outlier = frame_num in stats.outlier_frames if isinstance(frame_num, int) else False
outlier_mark = " *OUTLIER*" if is_outlier else ""
print(f" Frame {frame_num}: {t * 1000:.3f} ms{outlier_mark}")
# Show timing around known problematic frames
problematic_frames = [1576, 1582, 1634, 1640]
for prob_frame in problematic_frames:
if prob_frame in stats.frame_numbers:
idx = stats.frame_numbers.index(prob_frame)
if idx > 0 and idx-1 < len(stats.inter_arrival_times):
inter_time = stats.inter_arrival_times[idx-1]
deviation = (inter_time - stats.avg_inter_arrival) / stats.std_inter_arrival if stats.std_inter_arrival > 0 else 0
print(f" >> Frame {prob_frame}: {inter_time * 1000:.3f} ms ({deviation:.1f}σ)")
else:
print("Not enough data for timing analysis")
print(f"\n=== CROSS-CONTAMINATION CHECK ===")
# Check if timing from one frame type is affecting another
# Look for cases where inter-arrival times might be calculated across frame types
print("Checking for inter-frame-type timing calculations...")
# Get all frame timestamps in order
all_frame_data = []
for frame_type, stats in target_flow.frame_types.items():
for i, (frame_num, timestamp) in enumerate(zip(stats.frame_numbers, stats.timestamps)):
all_frame_data.append((frame_num, timestamp, frame_type))
# Sort by frame number
all_frame_data.sort(key=lambda x: x[0])
print(f"\nFirst 20 frames in order:")
for i, (frame_num, timestamp, frame_type) in enumerate(all_frame_data[:20]):
if i > 0:
prev_timestamp = all_frame_data[i-1][1]
prev_frame_type = all_frame_data[i-1][2]
inter_time = timestamp - prev_timestamp
cross_type = frame_type != prev_frame_type
print(f" Frame {frame_num} ({frame_type}): ΔT={inter_time * 1000:.3f} ms {'[CROSS-TYPE]' if cross_type else ''}")
print(f"\n=== PROBLEMATIC FRAMES ANALYSIS ===")
# Check each problematic frame to see which frame type it belongs to
problematic_frames = [1576, 1582, 1634, 1640]
for prob_frame in problematic_frames:
frame_type_found = None
for frame_type, stats in target_flow.frame_types.items():
if prob_frame in stats.frame_numbers:
frame_type_found = frame_type
break
print(f"Frame {prob_frame}: belongs to {frame_type_found}")
# Check if this frame is an outlier in its frame type
if frame_type_found:
stats = target_flow.frame_types[frame_type_found]
is_outlier = prob_frame in stats.outlier_frames
print(f" -> Is outlier in {frame_type_found}: {is_outlier}")
if __name__ == "__main__":
if len(sys.argv) > 1:
debug_frame_timing_isolation(sys.argv[1])
else:
debug_frame_timing_isolation("1 PTPGM.pcapng")