Files
airstream/frametypes/overview.py

64 lines
2.0 KiB
Python
Raw Permalink Normal View History

2025-08-03 20:20:55 -04:00
import statistics
from typing import Dict, List, Any
from scapy.all import Packet
from .base import FrameTypeInterface
class Overview(FrameTypeInterface):
def __init__(self):
self.name = "Overview"
self.count = 0
self.first_time = None
self.last_time = None
self.Tdeltas = []
self.bytes = 0
self.frames_list = []
def add(self, timestamp: float, size: int, packet: Packet):
self.count += 1
self.bytes += size
self.frames_list.append(packet)
if self.first_time is None:
self.first_time = timestamp
else:
self.Tdeltas.append(timestamp - self.last_time)
self.last_time = timestamp
@property
def duration(self): return (self.last_time or 0) - (self.first_time or 0)
@property
def avg_size(self): return self.bytes / self.count if self.count else 0
@property
def avg_delta(self): return statistics.mean(self.Tdeltas) if self.Tdeltas else 0
@property
def std_delta(self): return statistics.stdev(self.Tdeltas) if len(self.Tdeltas) > 1 else 0
@property
def pkt_rate(self): return self.count / self.duration if self.duration > 0 else 0
@property
def byte_rate(self): return self.bytes / self.duration if self.duration > 0 else 0
def get_summary_dict(self) -> Dict[str, Any]:
return {
'Pkts': self.count,
'Bytes': self.bytes,
'Duration': round(self.duration, 3),
'Avg Size': round(self.avg_size, 1),
'Avg TimeΔ': round(self.avg_delta, 6),
'Time 1σ': round(self.std_delta, 6),
'Pkt/s': round(self.pkt_rate, 1),
'B/s': round(self.byte_rate, 1)
}
def get_column_definitions(self) -> List[tuple]:
return [
('Pkts', 'd'),
('Bytes', 'd'),
('Duration', '.3f'),
('Avg Size', '.1f'),
('Avg Delta', '.6f'),
('Std Delta', '.6f'),
('Pkt/s', '.1f'),
('B/s', '.1f')
]