from typing import Dict, List, Any, Type, Optional import time class BaseStats: """Base statistics class for packet flow analysis.""" def __init__(self): self.packet_count = 0 self.byte_count = 0 self.first_timestamp = None self.last_timestamp = None self.packet_sizes = [] self.inter_arrival_times = [] self.last_packet_time = None def add(self, timestamp: float, size: int, packet: Any): """Add a packet to statistics.""" self.packet_count += 1 self.byte_count += size self.packet_sizes.append(size) if self.first_timestamp is None: self.first_timestamp = timestamp self.last_timestamp = timestamp if self.last_packet_time is not None: delta = timestamp - self.last_packet_time self.inter_arrival_times.append(delta) self.last_packet_time = timestamp # Call protocol-specific handler self._process_packet(packet) def _process_packet(self, packet: Any): """Override in subclasses for protocol-specific processing.""" pass def get_summary_dict(self) -> Dict[str, Any]: """Get summary statistics as dictionary.""" duration = (self.last_timestamp - self.first_timestamp) if self.first_timestamp and self.last_timestamp else 0 summary = { 'Pkts': self.packet_count, 'Bytes': self.byte_count, 'Duration': round(duration, 3) if duration else 0, 'Avg Size': round(sum(self.packet_sizes) / len(self.packet_sizes), 1) if self.packet_sizes else 0, } if self.inter_arrival_times: avg_delta = sum(self.inter_arrival_times) / len(self.inter_arrival_times) summary['Avg TimeΔ'] = round(avg_delta, 6) # Calculate standard deviation if len(self.inter_arrival_times) > 1: mean = avg_delta variance = sum((x - mean) ** 2 for x in self.inter_arrival_times) / len(self.inter_arrival_times) std_dev = variance ** 0.5 summary['Time 1σ'] = round(std_dev, 6) else: summary['Time 1σ'] = 0 else: summary['Avg TimeΔ'] = 0 summary['Time 1σ'] = 0 if duration > 0: summary['Pkt/s'] = round(self.packet_count / duration, 1) summary['B/s'] = round(self.byte_count / duration, 1) else: summary['Pkt/s'] = 0 summary['B/s'] = 0 return summary class OverviewStats(BaseStats): """Overview statistics for general packet analysis.""" pass class PTPStats(BaseStats): """PTP-specific statistics.""" def __init__(self): super().__init__() self.ptp_message_types = {} def _process_packet(self, packet: Any): """Process PTP-specific packet information.""" # Check if packet has PTP layer if hasattr(packet, 'ptp'): try: msg_type = packet.ptp.v2_messagetype if hasattr(packet.ptp, 'v2_messagetype') else 'unknown' self.ptp_message_types[msg_type] = self.ptp_message_types.get(msg_type, 0) + 1 except: pass def get_summary_dict(self) -> Dict[str, Any]: summary = super().get_summary_dict() # Add PTP-specific metrics if self.ptp_message_types: summary['PTP Types'] = str(self.ptp_message_types) return summary class MultiStats: """Container for multiple stats instances.""" def __init__(self, stats_classes: Optional[List[Type[BaseStats]]] = None): if stats_classes is None: stats_classes = [OverviewStats] self.stats_instances = [cls() for cls in stats_classes] self.stats_classes = stats_classes def add(self, timestamp: float, size: int, packet: Any): """Add packet to all stats instances.""" for stats in self.stats_instances: stats.add(timestamp, size, packet) def get_combined_summary(self) -> Dict[str, Any]: """Combine summaries from all stats instances.""" combined = {} for i, stats in enumerate(self.stats_instances): summary = stats.get_summary_dict() class_name = self.stats_classes[i].__name__.replace('Stats', '') # Add prefix to avoid column name conflicts for key, value in summary.items(): if key in ['Pkts', 'Bytes', 'Duration']: # Common columns if i == 0: # Only include once combined[key] = value else: # Only add prefix if we have multiple stats classes if len(self.stats_classes) > 1: combined[f"{class_name}:{key}"] = value else: combined[key] = value return combined # Stats registry for easy lookup STATS_TYPES = { 'overview': OverviewStats, 'ptp': PTPStats, }