""" Data structures for flow and frame type statistics """ from dataclasses import dataclass, field from typing import Dict, List, Set, Tuple @dataclass class FrameTypeStats: """Statistics for a specific frame type within a flow""" frame_type: str count: int = 0 total_bytes: int = 0 timestamps: List[float] = field(default_factory=list) frame_numbers: List[int] = field(default_factory=list) inter_arrival_times: List[float] = field(default_factory=list) avg_inter_arrival: float = 0.0 std_inter_arrival: float = 0.0 outlier_frames: List[int] = field(default_factory=list) outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_num, delta_t) - legacy enhanced_outlier_details: List[Tuple[int, int, float]] = field(default_factory=list) # (frame_num, prev_frame_num, delta_t) @dataclass class EnhancedAnalysisData: """Enhanced analysis data from specialized decoders""" # CH10 Timing Analysis avg_clock_drift_ppm: float = 0.0 max_clock_drift_ppm: float = 0.0 timing_quality: str = "Unknown" # excellent, good, moderate, poor timing_stability: str = "Unknown" # stable, variable anomaly_rate: float = 0.0 # Percentage of frames with timing anomalies avg_confidence_score: float = 0.0 # CH10 Frame Quality avg_frame_quality: float = 0.0 sequence_gaps: int = 0 rtc_sync_errors: int = 0 format_errors: int = 0 overflow_errors: int = 0 # CH10 Data Analysis channel_count: int = 0 analog_channels: int = 0 pcm_channels: int = 0 tmats_frames: int = 0 # General Enhanced Data has_internal_timing: bool = False primary_data_type: str = "Unknown" decoder_type: str = "Standard" # Standard, Chapter10_Enhanced, PTP_Enhanced, etc. # Decoded Frame Data Storage sample_decoded_fields: Dict[str, any] = field(default_factory=dict) # Sample of actual decoded fields for display available_field_names: List[str] = field(default_factory=list) # List of all available field names from decoder field_count: int = 0 # Total number of fields decoded frame_types: Set[str] = field(default_factory=set) # Set of unique frame types encountered timing_accuracy: float = 0.0 # Timing accuracy percentage signal_quality: float = 0.0 # Signal quality percentage @dataclass class FlowStats: """Statistics for a source-destination IP pair""" src_ip: str dst_ip: str src_port: int = 0 # Source port (0 if not applicable/unknown) dst_port: int = 0 # Destination port (0 if not applicable/unknown) transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc. traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast frame_count: int = 0 timestamps: List[float] = field(default_factory=list) frame_numbers: List[int] = field(default_factory=list) inter_arrival_times: List[float] = field(default_factory=list) avg_inter_arrival: float = 0.0 std_inter_arrival: float = 0.0 outlier_frames: List[int] = field(default_factory=list) outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta) total_bytes: int = 0 protocols: Set[str] = field(default_factory=set) detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocol detection (CH10, PTP, IENA, etc) frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) # Per-frame-type statistics enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData) # Enhanced decoder analysis first_seen: float = 0.0 # Timestamp of the first frame in this flow last_seen: float = 0.0 # Timestamp of the last frame in this flow duration: float = 0.0 # Duration of the flow in seconds jitter: float = 0.0 # Network jitter measurement