64 lines
2.0 KiB
Python
64 lines
2.0 KiB
Python
import statistics
|
||
from typing import Dict, List, Any
|
||
from scapy.all import Packet
|
||
|
||
from .base import FrameTypeInterface
|
||
|
||
|
||
class Overview(FrameTypeInterface):
|
||
def __init__(self):
|
||
self.name = "Overview"
|
||
self.count = 0
|
||
self.first_time = None
|
||
self.last_time = None
|
||
self.Tdeltas = []
|
||
self.bytes = 0
|
||
self.frames_list = []
|
||
|
||
def add(self, timestamp: float, size: int, packet: Packet):
|
||
self.count += 1
|
||
self.bytes += size
|
||
self.frames_list.append(packet)
|
||
|
||
if self.first_time is None:
|
||
self.first_time = timestamp
|
||
else:
|
||
self.Tdeltas.append(timestamp - self.last_time)
|
||
self.last_time = timestamp
|
||
|
||
@property
|
||
def duration(self): return (self.last_time or 0) - (self.first_time or 0)
|
||
@property
|
||
def avg_size(self): return self.bytes / self.count if self.count else 0
|
||
@property
|
||
def avg_delta(self): return statistics.mean(self.Tdeltas) if self.Tdeltas else 0
|
||
@property
|
||
def std_delta(self): return statistics.stdev(self.Tdeltas) if len(self.Tdeltas) > 1 else 0
|
||
@property
|
||
def pkt_rate(self): return self.count / self.duration if self.duration > 0 else 0
|
||
@property
|
||
def byte_rate(self): return self.bytes / self.duration if self.duration > 0 else 0
|
||
|
||
def get_summary_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
'Pkts': self.count,
|
||
'Bytes': self.bytes,
|
||
'Duration': round(self.duration, 3),
|
||
'Avg Size': round(self.avg_size, 1),
|
||
'Avg TimeΔ': round(self.avg_delta, 6),
|
||
'Time 1σ': round(self.std_delta, 6),
|
||
'Pkt/s': round(self.pkt_rate, 1),
|
||
'B/s': round(self.byte_rate, 1)
|
||
}
|
||
|
||
def get_column_definitions(self) -> List[tuple]:
|
||
return [
|
||
('Pkts', 'd'),
|
||
('Bytes', 'd'),
|
||
('Duration', '.3f'),
|
||
('Avg Size', '.1f'),
|
||
('Avg Delta', '.6f'),
|
||
('Std Delta', '.6f'),
|
||
('Pkt/s', '.1f'),
|
||
('B/s', '.1f')
|
||
] |