Files
airstream/pyshark_poc/stats.py
2025-08-03 20:20:55 -04:00

145 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Dict, List, Any, Type, Optional
import time
class BaseStats:
"""Base statistics class for packet flow analysis."""
def __init__(self):
self.packet_count = 0
self.byte_count = 0
self.first_timestamp = None
self.last_timestamp = None
self.packet_sizes = []
self.inter_arrival_times = []
self.last_packet_time = None
def add(self, timestamp: float, size: int, packet: Any):
"""Add a packet to statistics."""
self.packet_count += 1
self.byte_count += size
self.packet_sizes.append(size)
if self.first_timestamp is None:
self.first_timestamp = timestamp
self.last_timestamp = timestamp
if self.last_packet_time is not None:
delta = timestamp - self.last_packet_time
self.inter_arrival_times.append(delta)
self.last_packet_time = timestamp
# Call protocol-specific handler
self._process_packet(packet)
def _process_packet(self, packet: Any):
"""Override in subclasses for protocol-specific processing."""
pass
def get_summary_dict(self) -> Dict[str, Any]:
"""Get summary statistics as dictionary."""
duration = (self.last_timestamp - self.first_timestamp) if self.first_timestamp and self.last_timestamp else 0
summary = {
'Pkts': self.packet_count,
'Bytes': self.byte_count,
'Duration': round(duration, 3) if duration else 0,
'Avg Size': round(sum(self.packet_sizes) / len(self.packet_sizes), 1) if self.packet_sizes else 0,
}
if self.inter_arrival_times:
avg_delta = sum(self.inter_arrival_times) / len(self.inter_arrival_times)
summary['Avg TimeΔ'] = round(avg_delta, 6)
# Calculate standard deviation
if len(self.inter_arrival_times) > 1:
mean = avg_delta
variance = sum((x - mean) ** 2 for x in self.inter_arrival_times) / len(self.inter_arrival_times)
std_dev = variance ** 0.5
summary['Time 1σ'] = round(std_dev, 6)
else:
summary['Time 1σ'] = 0
else:
summary['Avg TimeΔ'] = 0
summary['Time 1σ'] = 0
if duration > 0:
summary['Pkt/s'] = round(self.packet_count / duration, 1)
summary['B/s'] = round(self.byte_count / duration, 1)
else:
summary['Pkt/s'] = 0
summary['B/s'] = 0
return summary
class OverviewStats(BaseStats):
"""Overview statistics for general packet analysis."""
pass
class PTPStats(BaseStats):
"""PTP-specific statistics."""
def __init__(self):
super().__init__()
self.ptp_message_types = {}
def _process_packet(self, packet: Any):
"""Process PTP-specific packet information."""
# Check if packet has PTP layer
if hasattr(packet, 'ptp'):
try:
msg_type = packet.ptp.v2_messagetype if hasattr(packet.ptp, 'v2_messagetype') else 'unknown'
self.ptp_message_types[msg_type] = self.ptp_message_types.get(msg_type, 0) + 1
except:
pass
def get_summary_dict(self) -> Dict[str, Any]:
summary = super().get_summary_dict()
# Add PTP-specific metrics
if self.ptp_message_types:
summary['PTP Types'] = str(self.ptp_message_types)
return summary
class MultiStats:
"""Container for multiple stats instances."""
def __init__(self, stats_classes: Optional[List[Type[BaseStats]]] = None):
if stats_classes is None:
stats_classes = [OverviewStats]
self.stats_instances = [cls() for cls in stats_classes]
self.stats_classes = stats_classes
def add(self, timestamp: float, size: int, packet: Any):
"""Add packet to all stats instances."""
for stats in self.stats_instances:
stats.add(timestamp, size, packet)
def get_combined_summary(self) -> Dict[str, Any]:
"""Combine summaries from all stats instances."""
combined = {}
for i, stats in enumerate(self.stats_instances):
summary = stats.get_summary_dict()
class_name = self.stats_classes[i].__name__.replace('Stats', '')
# Add prefix to avoid column name conflicts
for key, value in summary.items():
if key in ['Pkts', 'Bytes', 'Duration']: # Common columns
if i == 0: # Only include once
combined[key] = value
else:
# Only add prefix if we have multiple stats classes
if len(self.stats_classes) > 1:
combined[f"{class_name}:{key}"] = value
else:
combined[key] = value
return combined
# Stats registry for easy lookup
STATS_TYPES = {
'overview': OverviewStats,
'ptp': PTPStats,
}