Files
StreamLens/verify_pcap_frames.py

92 lines
3.5 KiB
Python

#!/usr/bin/env python3
"""Verify frame sequence directly from PCAP"""
import sys
sys.path.append('.')
from analyzer.utils import PCAPLoader
try:
from scapy.all import IP, UDP
except ImportError:
print("Scapy not available")
sys.exit(1)
def verify_pcap_frames(pcap_file="1 PTPGM.pcapng", src_ip="192.168.4.89"):
"""Verify frame sequence directly from PCAP file"""
print("=== Verifying PCAP Frame Sequence ===")
loader = PCAPLoader(pcap_file)
packets = loader.load_all()
print(f"Loaded {len(packets)} packets")
# Track CH10-Data frames from the specified source
ch10_data_frames = []
for i, packet in enumerate(packets, 1):
if packet.haslayer(IP):
ip_layer = packet[IP]
if ip_layer.src == src_ip:
# Simple heuristic: if it's UDP and has a reasonable payload, likely CH10-Data
if packet.haslayer(UDP):
udp_layer = packet[UDP]
payload_size = len(udp_layer.payload) if udp_layer.payload else 0
# CH10-Data frames typically have substantial payloads
# TMATS and other control frames might be different sizes
if payload_size > 100: # Likely CH10-Data
timestamp = float(packet.time)
ch10_data_frames.append((i, timestamp))
print(f"Found {len(ch10_data_frames)} likely CH10-Data frames")
# Look specifically around frame 1001
target_frame = 1001
print(f"\n=== Frames around {target_frame} ===")
# Find frames around target
for idx, (frame_num, timestamp) in enumerate(ch10_data_frames):
if abs(frame_num - target_frame) <= 3:
if idx > 0:
prev_frame_num, prev_timestamp = ch10_data_frames[idx - 1]
delta_t = timestamp - prev_timestamp
print(f"Frame {frame_num}: prev={prev_frame_num}, Δt={delta_t*1000:.3f}ms")
else:
print(f"Frame {frame_num}: (first frame)")
# Check if frame 1001 exists and what its previous frame is
frame_1001_found = False
for idx, (frame_num, timestamp) in enumerate(ch10_data_frames):
if frame_num == target_frame:
frame_1001_found = True
if idx > 0:
prev_frame_num, prev_timestamp = ch10_data_frames[idx - 1]
delta_t = timestamp - prev_timestamp
print(f"\n✅ Frame 1001 found!")
print(f" Previous CH10-Data frame: {prev_frame_num}")
print(f" Time delta: {delta_t*1000:.3f} ms")
# Check if this would be an outlier (rough calculation)
if delta_t > 0.200: # > 200ms might be an outlier for CH10-Data
print(f" ⚠️ This might be an outlier (>200ms)")
else:
print(f" ✅ Normal timing")
else:
print(f"\n✅ Frame 1001 is the first CH10-Data frame")
break
if not frame_1001_found:
print(f"\n❌ Frame 1001 not found in CH10-Data frames")
# Show a sample of the sequence to verify our logic
print(f"\n=== Sample CH10-Data Frame Sequence ===")
for i in range(min(10, len(ch10_data_frames))):
frame_num, timestamp = ch10_data_frames[i]
print(f" [{i}] Frame {frame_num}: {timestamp}")
if __name__ == "__main__":
if len(sys.argv) > 1:
verify_pcap_frames(sys.argv[1])
else:
verify_pcap_frames()