Files
airstream/core/stats.py

31 lines
1.2 KiB
Python
Raw Permalink Normal View History

2025-08-03 20:20:55 -04:00
from typing import Dict, List, Any, Type
from scapy.all import Packet
from frametypes import FrameTypeInterface
class MultiStats:
"""Container for multiple stats instances."""
def __init__(self, stats_classes: List[Type[FrameTypeInterface]]):
self.stats_instances = [cls() for cls in stats_classes]
self.stats_classes = stats_classes
def add(self, timestamp: float, size: int, packet: Packet):
for stats in self.stats_instances:
stats.add(timestamp, size, packet)
def get_combined_summary(self) -> Dict[str, Any]:
"""Combine summaries from all stats instances."""
combined = {}
for i, stats in enumerate(self.stats_instances):
summary = stats.get_summary_dict()
class_name = self.stats_classes[i].__name__.replace('Stats', '')
# Add prefix to avoid column name conflicts
for key, value in summary.items():
if key in ['Pkts', 'Bytes', 'Duration']: # Common columns
if i == 0: # Only include once
combined[key] = value
else:
combined[f"{class_name}:{key}"] = value
return combined