Files
airstream/frametypes/ptp_delay.py

53 lines
1.7 KiB
Python
Raw Permalink Normal View History

2025-08-03 20:20:55 -04:00
import statistics
from typing import Dict, List, Any
from scapy.all import Packet
from .base import FrameTypeInterface
class PTPDelayStats(FrameTypeInterface):
"""PTP Delay Request/Response Statistics"""
def __init__(self):
super().__init__()
self.name = "PTP Delay"
self.count = 0
self.bytes = 0
self.first_time = None
self.last_time = None
self.delay_req_count = 0
self.delay_resp_count = 0
self.pdelay_req_count = 0
self.pdelay_resp_count = 0
self.round_trip_times = []
self.asymmetry_values = []
def add(self, timestamp: float, size: int, packet: Packet):
self.count += 1
self.bytes += size
if self.first_time is None:
self.first_time = timestamp
self.last_time = timestamp
# In real implementation, match delay requests with responses
def get_summary_dict(self) -> Dict[str, Any]:
duration = (self.last_time or 0) - (self.first_time or 0)
avg_rtt = statistics.mean(self.round_trip_times) if self.round_trip_times else 0
return {
'Pkts': self.count,
'Delay Req': self.delay_req_count,
'Delay Resp': self.delay_resp_count,
'PDelay Req': self.pdelay_req_count,
'PDelay Resp': self.pdelay_resp_count,
'Avg RTT': round(avg_rtt * 1000, 3) # Convert to ms
}
def get_column_definitions(self) -> List[tuple]:
return [
('Pkts', 'd'),
('Delay Req', 'd'),
('Delay Resp', 'd'),
('PDelay Req', 'd'),
('PDelay Resp', 'd'),
('Avg RTT', '.3f')
]