from collections import defaultdict from typing import Dict, List, Any from scapy.all import Packet from .base import FrameTypeInterface class PTPAnnounceStats(FrameTypeInterface): """PTP Announce Message Statistics""" def __init__(self): super().__init__() self.name = "PTP Announce" self.count = 0 self.bytes = 0 self.first_time = None self.last_time = None self.announce_count = 0 self.grandmaster_changes = 0 self.priority_changes = 0 self.grandmaster_ids = set() self.clock_classes = defaultdict(int) def add(self, timestamp: float, size: int, packet: Packet): self.count += 1 self.bytes += size if self.first_time is None: self.first_time = timestamp self.last_time = timestamp # In real implementation, decode PTP announce messages def get_summary_dict(self) -> Dict[str, Any]: duration = (self.last_time or 0) - (self.first_time or 0) return { 'Pkts': self.count, 'Announce': self.announce_count, 'GM Changes': self.grandmaster_changes, 'Priority Changes': self.priority_changes, 'Grandmasters': len(self.grandmaster_ids), 'Clock Classes': len(self.clock_classes) } def get_column_definitions(self) -> List[tuple]: return [ ('Pkts', 'd'), ('Announce', 'd'), ('GM Changes', 'd'), ('Priority Changes', 'd'), ('Grandmasters', 'd'), ('Clock Classes', 'd') ]