145 lines
5.0 KiB
Python
145 lines
5.0 KiB
Python
|
|
from typing import Dict, List, Any, Type, Optional
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BaseStats:
|
|||
|
|
"""Base statistics class for packet flow analysis."""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self.packet_count = 0
|
|||
|
|
self.byte_count = 0
|
|||
|
|
self.first_timestamp = None
|
|||
|
|
self.last_timestamp = None
|
|||
|
|
self.packet_sizes = []
|
|||
|
|
self.inter_arrival_times = []
|
|||
|
|
self.last_packet_time = None
|
|||
|
|
|
|||
|
|
def add(self, timestamp: float, size: int, packet: Any):
|
|||
|
|
"""Add a packet to statistics."""
|
|||
|
|
self.packet_count += 1
|
|||
|
|
self.byte_count += size
|
|||
|
|
self.packet_sizes.append(size)
|
|||
|
|
|
|||
|
|
if self.first_timestamp is None:
|
|||
|
|
self.first_timestamp = timestamp
|
|||
|
|
self.last_timestamp = timestamp
|
|||
|
|
|
|||
|
|
if self.last_packet_time is not None:
|
|||
|
|
delta = timestamp - self.last_packet_time
|
|||
|
|
self.inter_arrival_times.append(delta)
|
|||
|
|
self.last_packet_time = timestamp
|
|||
|
|
|
|||
|
|
# Call protocol-specific handler
|
|||
|
|
self._process_packet(packet)
|
|||
|
|
|
|||
|
|
def _process_packet(self, packet: Any):
|
|||
|
|
"""Override in subclasses for protocol-specific processing."""
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def get_summary_dict(self) -> Dict[str, Any]:
|
|||
|
|
"""Get summary statistics as dictionary."""
|
|||
|
|
duration = (self.last_timestamp - self.first_timestamp) if self.first_timestamp and self.last_timestamp else 0
|
|||
|
|
|
|||
|
|
summary = {
|
|||
|
|
'Pkts': self.packet_count,
|
|||
|
|
'Bytes': self.byte_count,
|
|||
|
|
'Duration': round(duration, 3) if duration else 0,
|
|||
|
|
'Avg Size': round(sum(self.packet_sizes) / len(self.packet_sizes), 1) if self.packet_sizes else 0,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if self.inter_arrival_times:
|
|||
|
|
avg_delta = sum(self.inter_arrival_times) / len(self.inter_arrival_times)
|
|||
|
|
summary['Avg TimeΔ'] = round(avg_delta, 6)
|
|||
|
|
|
|||
|
|
# Calculate standard deviation
|
|||
|
|
if len(self.inter_arrival_times) > 1:
|
|||
|
|
mean = avg_delta
|
|||
|
|
variance = sum((x - mean) ** 2 for x in self.inter_arrival_times) / len(self.inter_arrival_times)
|
|||
|
|
std_dev = variance ** 0.5
|
|||
|
|
summary['Time 1σ'] = round(std_dev, 6)
|
|||
|
|
else:
|
|||
|
|
summary['Time 1σ'] = 0
|
|||
|
|
else:
|
|||
|
|
summary['Avg TimeΔ'] = 0
|
|||
|
|
summary['Time 1σ'] = 0
|
|||
|
|
|
|||
|
|
if duration > 0:
|
|||
|
|
summary['Pkt/s'] = round(self.packet_count / duration, 1)
|
|||
|
|
summary['B/s'] = round(self.byte_count / duration, 1)
|
|||
|
|
else:
|
|||
|
|
summary['Pkt/s'] = 0
|
|||
|
|
summary['B/s'] = 0
|
|||
|
|
|
|||
|
|
return summary
|
|||
|
|
|
|||
|
|
|
|||
|
|
class OverviewStats(BaseStats):
|
|||
|
|
"""Overview statistics for general packet analysis."""
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
class PTPStats(BaseStats):
|
|||
|
|
"""PTP-specific statistics."""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
super().__init__()
|
|||
|
|
self.ptp_message_types = {}
|
|||
|
|
|
|||
|
|
def _process_packet(self, packet: Any):
|
|||
|
|
"""Process PTP-specific packet information."""
|
|||
|
|
# Check if packet has PTP layer
|
|||
|
|
if hasattr(packet, 'ptp'):
|
|||
|
|
try:
|
|||
|
|
msg_type = packet.ptp.v2_messagetype if hasattr(packet.ptp, 'v2_messagetype') else 'unknown'
|
|||
|
|
self.ptp_message_types[msg_type] = self.ptp_message_types.get(msg_type, 0) + 1
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def get_summary_dict(self) -> Dict[str, Any]:
|
|||
|
|
summary = super().get_summary_dict()
|
|||
|
|
# Add PTP-specific metrics
|
|||
|
|
if self.ptp_message_types:
|
|||
|
|
summary['PTP Types'] = str(self.ptp_message_types)
|
|||
|
|
return summary
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MultiStats:
|
|||
|
|
"""Container for multiple stats instances."""
|
|||
|
|
|
|||
|
|
def __init__(self, stats_classes: Optional[List[Type[BaseStats]]] = None):
|
|||
|
|
if stats_classes is None:
|
|||
|
|
stats_classes = [OverviewStats]
|
|||
|
|
self.stats_instances = [cls() for cls in stats_classes]
|
|||
|
|
self.stats_classes = stats_classes
|
|||
|
|
|
|||
|
|
def add(self, timestamp: float, size: int, packet: Any):
|
|||
|
|
"""Add packet to all stats instances."""
|
|||
|
|
for stats in self.stats_instances:
|
|||
|
|
stats.add(timestamp, size, packet)
|
|||
|
|
|
|||
|
|
def get_combined_summary(self) -> Dict[str, Any]:
|
|||
|
|
"""Combine summaries from all stats instances."""
|
|||
|
|
combined = {}
|
|||
|
|
for i, stats in enumerate(self.stats_instances):
|
|||
|
|
summary = stats.get_summary_dict()
|
|||
|
|
class_name = self.stats_classes[i].__name__.replace('Stats', '')
|
|||
|
|
|
|||
|
|
# Add prefix to avoid column name conflicts
|
|||
|
|
for key, value in summary.items():
|
|||
|
|
if key in ['Pkts', 'Bytes', 'Duration']: # Common columns
|
|||
|
|
if i == 0: # Only include once
|
|||
|
|
combined[key] = value
|
|||
|
|
else:
|
|||
|
|
# Only add prefix if we have multiple stats classes
|
|||
|
|
if len(self.stats_classes) > 1:
|
|||
|
|
combined[f"{class_name}:{key}"] = value
|
|||
|
|
else:
|
|||
|
|
combined[key] = value
|
|||
|
|
return combined
|
|||
|
|
|
|||
|
|
|
|||
|
|
# Stats registry for easy lookup
|
|||
|
|
STATS_TYPES = {
|
|||
|
|
'overview': OverviewStats,
|
|||
|
|
'ptp': PTPStats,
|
|||
|
|
}
|