57 lines
1.8 KiB
Python
57 lines
1.8 KiB
Python
from collections import defaultdict
|
|
from typing import Dict, List, Any
|
|
from scapy.all import Packet
|
|
|
|
from .base import FrameTypeInterface
|
|
|
|
|
|
class Chapter10Stats(FrameTypeInterface):
|
|
"""General Chapter 10 Statistics"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "Chapter 10"
|
|
self.count = 0
|
|
self.bytes = 0
|
|
self.first_time = None
|
|
self.last_time = None
|
|
self.channel_ids = set()
|
|
self.data_types = defaultdict(int)
|
|
self.sequence_errors = 0
|
|
self.last_sequence = {}
|
|
|
|
def add(self, timestamp: float, size: int, packet: Packet):
|
|
self.count += 1
|
|
self.bytes += size
|
|
if self.first_time is None:
|
|
self.first_time = timestamp
|
|
self.last_time = timestamp
|
|
|
|
# Simulate Chapter 10 specific processing
|
|
# In real implementation, decode Chapter 10 headers
|
|
if packet:
|
|
# Example: extract channel ID and data type from payload
|
|
# channel_id = extract_channel_id(packet)
|
|
# data_type = extract_data_type(packet)
|
|
# sequence = extract_sequence_number(packet)
|
|
pass
|
|
|
|
def get_summary_dict(self) -> Dict[str, Any]:
|
|
duration = (self.last_time or 0) - (self.first_time or 0)
|
|
return {
|
|
'Pkts': self.count,
|
|
'Bytes': self.bytes,
|
|
'Duration': round(duration, 3),
|
|
'Channels': len(self.channel_ids),
|
|
'Seq Errors': self.sequence_errors,
|
|
'Pkt/s': round(self.count / duration, 1) if duration > 0 else 0
|
|
}
|
|
|
|
def get_column_definitions(self) -> List[tuple]:
|
|
return [
|
|
('Pkts', 'd'),
|
|
('Bytes', 'd'),
|
|
('Duration', '.3f'),
|
|
('Channels', 'd'),
|
|
('Seq Errors', 'd'),
|
|
('Pkt/s', '.1f')
|
|
] |