58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
|
|
import statistics
|
||
|
|
from typing import Dict, List, Any
|
||
|
|
from scapy.all import Packet
|
||
|
|
|
||
|
|
from .base import FrameTypeInterface
|
||
|
|
|
||
|
|
|
||
|
|
class PTPSyncStats(FrameTypeInterface):
|
||
|
|
"""PTP Sync Message Statistics"""
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
self.name = "PTP Sync"
|
||
|
|
self.count = 0
|
||
|
|
self.bytes = 0
|
||
|
|
self.first_time = None
|
||
|
|
self.last_time = None
|
||
|
|
self.sync_count = 0
|
||
|
|
self.follow_up_count = 0
|
||
|
|
self.two_step_count = 0
|
||
|
|
self.one_step_count = 0
|
||
|
|
self.sync_intervals = []
|
||
|
|
self.clock_ids = set()
|
||
|
|
|
||
|
|
def add(self, timestamp: float, size: int, packet: Packet):
|
||
|
|
self.count += 1
|
||
|
|
self.bytes += size
|
||
|
|
if self.first_time is None:
|
||
|
|
self.first_time = timestamp
|
||
|
|
else:
|
||
|
|
if self.last_time and self.sync_count > 0:
|
||
|
|
self.sync_intervals.append(timestamp - self.last_time)
|
||
|
|
self.last_time = timestamp
|
||
|
|
|
||
|
|
# In real implementation, decode PTP sync messages
|
||
|
|
|
||
|
|
def get_summary_dict(self) -> Dict[str, Any]:
|
||
|
|
duration = (self.last_time or 0) - (self.first_time or 0)
|
||
|
|
avg_interval = statistics.mean(self.sync_intervals) if self.sync_intervals else 0
|
||
|
|
return {
|
||
|
|
'Pkts': self.count,
|
||
|
|
'Sync Msgs': self.sync_count,
|
||
|
|
'Follow-ups': self.follow_up_count,
|
||
|
|
'Two-step': self.two_step_count,
|
||
|
|
'One-step': self.one_step_count,
|
||
|
|
'Clock IDs': len(self.clock_ids),
|
||
|
|
'Avg Interval': round(avg_interval, 6)
|
||
|
|
}
|
||
|
|
|
||
|
|
def get_column_definitions(self) -> List[tuple]:
|
||
|
|
return [
|
||
|
|
('Pkts', 'd'),
|
||
|
|
('Sync Msgs', 'd'),
|
||
|
|
('Follow-ups', 'd'),
|
||
|
|
('Two-step', 'd'),
|
||
|
|
('One-step', 'd'),
|
||
|
|
('Clock IDs', 'd'),
|
||
|
|
('Avg Interval', '.6f')
|
||
|
|
]
|