# StreamLens Data Flow Architecture ## Overview StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA. ## Complete Data Flow ### 1. Packet Input Layer ```python # analyzer/analysis/core.py:36-43 def analyze_pcap(self, pcap_file: str): packets = rdpcap(pcap_file) # Scapy reads PCAP self._process_packets(packets) # Process each packet ``` **Sources**: - PCAP/PCAPNG files - Live network interfaces via Scapy ### 2. Packet Processing Engine ```python # analyzer/analysis/core.py:65-67 def _process_packets(self, packets): for i, packet in enumerate(packets): self.dissector.dissect_packet(packet, i + 1) ``` **Per-packet operations**: - Frame number assignment - Timestamp extraction - Initial protocol detection ### 3. Flow Management & Classification ```python # analyzer/analysis/flow_manager.py:38-58 def process_packet(self, packet, frame_num): # Extract network identifiers src_ip, dst_ip = ip_layer.src, ip_layer.dst transport_info = self._extract_transport_info(packet) # ports, protocol # Create unique flow key flow_key = (src_ip, dst_ip) # Initialize new flow or use existing if flow_key not in self.flows: self.flows[flow_key] = FlowStats(...) ``` **Flow Bucketing**: - Packets grouped by `(src_ip, dst_ip)` pairs - Each flow tracked in a `FlowStats` object - O(1) flow lookup via hash map ### 4. Protocol Detection Pipeline ```python # analyzer/analysis/flow_manager.py:89-96 # Basic protocols (UDP, TCP, ICMP) protocols = self._detect_basic_protocols(packet) # Enhanced protocol detection dissection_results = self._dissect_packet(packet, frame_num) enhanced_protocols = self._extract_enhanced_protocols(dissection_results) flow.detected_protocol_types.update(enhanced_protocols) # Fallback detection fallback_protocols = self._detect_fallback_protocols(packet, dissection_results) ``` **Protocol Hierarchy**: 1. **Basic**: Transport layer (UDP/TCP/ICMP) 2. **Enhanced**: Application layer (CH10/PTP/IENA) via specialized dissectors 3. **Fallback**: Port-based heuristics for unidentified protocols ### 5. Frame Type Classification ```python # analyzer/analysis/flow_manager.py:98-100 frame_type = self._classify_frame_type(packet, dissection_results) self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size) ``` **Sub-Flow Binning**: - Each packet classified into specific frame types - Enhanced protocols: `CH10-Data`, `CH10-ACTTS`, `PTP-Sync`, `IENA-Control` - Standard protocols: `UDP-Data`, `TCP-Stream` ### 6. Statistical Analysis Engine ```python # analyzer/analysis/flow_manager.py:105-112 if len(flow.timestamps) > 1: inter_arrival = timestamp - flow.timestamps[-2] flow.inter_arrival_times.append(inter_arrival) # Real-time statistics if enabled if self.statistics_engine and self.statistics_engine.enable_realtime: self.statistics_engine.update_realtime_statistics(flow_key, flow) ``` **Timing Calculations**: - **Flow-level**: Aggregate timing across all packets - **Frame-type level**: Per-subtype timing in `FrameTypeStats` - **Outlier detection**: Based on configurable sigma thresholds - **Real-time updates**: Live statistical calculations during capture ### 7. Enhanced Protocol Analysis ```python # analyzer/analysis/flow_manager.py:102-103 self._perform_enhanced_analysis(packet, flow, frame_num, transport_info) ``` **Specialized Decoders**: - **Chapter 10**: Timing analysis, sequence validation, TMATS parsing - **PTP**: Clock synchronization analysis, message type classification - **IENA**: Packet structure validation, timing quality assessment ### 8. TUI Presentation Layer ```python # analyzer/tui/textual/widgets/flow_table_v2.py:143-153 if self._should_show_subrows(flow): enhanced_frame_types = self._get_enhanced_frame_types(flow) combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types) for extended_proto, frame_type, count, percentage in combinations: sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}") self.row_to_subflow_map[sub_key] = (i, frame_type) ``` **Presentation Logic**: - **Main Rows**: Flow-level aggregates (all packets in flow) - **Sub-Rows**: Only enhanced protocols (`CH10-*`, `PTP-*`, `IENA-*`) - **Standard protocols**: Aggregated into main flow timing - **Timing display**: ΔT (avg inter-arrival), σ (std deviation), outlier count ## Core Data Types ### FlowStats Main container for flow-level statistics: ```python @dataclass class FlowStats: # Network identifiers src_ip: str dst_ip: str src_port: int = 0 # 0 if not applicable dst_port: int = 0 transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc. traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast # Aggregate statistics frame_count: int = 0 total_bytes: int = 0 first_seen: float = 0.0 # Timestamp of first frame last_seen: float = 0.0 # Timestamp of last frame duration: float = 0.0 # Flow duration in seconds # Timing data timestamps: List[float] = field(default_factory=list) frame_numbers: List[int] = field(default_factory=list) inter_arrival_times: List[float] = field(default_factory=list) avg_inter_arrival: float = 0.0 std_inter_arrival: float = 0.0 jitter: float = 0.0 # Outlier tracking outlier_frames: List[int] = field(default_factory=list) outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta) # Protocol classification protocols: Set[str] = field(default_factory=set) # Basic protocols seen detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocols (CH10, PTP, IENA) # Sub-flow bins frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) # Enhanced analysis results enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData) ``` ### FrameTypeStats Statistics for specific frame types within a flow: ```python @dataclass class FrameTypeStats: # Frame type identifier frame_type: str # e.g., "CH10-Data", "PTP-Sync", "UDP-Data" # Basic statistics count: int = 0 total_bytes: int = 0 # Timing data timestamps: List[float] = field(default_factory=list) frame_numbers: List[int] = field(default_factory=list) inter_arrival_times: List[float] = field(default_factory=list) avg_inter_arrival: float = 0.0 std_inter_arrival: float = 0.0 # Outlier tracking outlier_frames: List[int] = field(default_factory=list) outlier_details: List[Tuple[int, float]] = field(default_factory=list) ``` ### Enhanced Analysis Data Models #### New Modular Structure The enhanced analysis data has been restructured into focused components for better organization and extensibility: ```python @dataclass class TimingAnalysis: """Timing analysis results for enhanced protocols""" avg_clock_drift_ppm: float = 0.0 max_clock_drift_ppm: float = 0.0 min_clock_drift_ppm: float = 0.0 drift_variance: float = 0.0 quality: TimingQuality = TimingQuality.UNKNOWN # EXCELLENT, GOOD, MODERATE, POOR stability: TimingStability = TimingStability.UNKNOWN # STABLE, VARIABLE, UNSTABLE timing_accuracy_percent: float = 0.0 sync_errors: int = 0 timing_anomalies: int = 0 anomaly_rate_percent: float = 0.0 has_internal_timing: bool = False rtc_sync_available: bool = False @dataclass class QualityMetrics: """Quality metrics for enhanced protocol data""" avg_frame_quality_percent: float = 0.0 frame_quality_samples: List[float] = field(default_factory=list) avg_signal_quality_percent: float = 0.0 signal_quality_samples: List[float] = field(default_factory=list) sequence_gaps: int = 0 format_errors: int = 0 overflow_errors: int = 0 checksum_errors: int = 0 avg_confidence_score: float = 0.0 confidence_samples: List[float] = field(default_factory=list) low_confidence_frames: int = 0 corrupted_frames: int = 0 missing_frames: int = 0 duplicate_frames: int = 0 @dataclass class DecodedData: """Container for decoded protocol data""" channel_count: int = 0 analog_channels: int = 0 pcm_channels: int = 0 discrete_channels: int = 0 primary_data_type: DataType = DataType.UNKNOWN # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS secondary_data_types: Set[DataType] = field(default_factory=set) sample_decoded_fields: Dict[str, Any] = field(default_factory=dict) available_field_names: List[str] = field(default_factory=list) field_count: int = 0 critical_fields: List[str] = field(default_factory=list) frame_types: Set[str] = field(default_factory=set) frame_type_distribution: Dict[str, int] = field(default_factory=dict) tmats_frames: int = 0 setup_frames: int = 0 data_frames: int = 0 decoder_type: str = "Standard" decoder_version: Optional[str] = None decode_success_rate: float = 1.0 @dataclass class EnhancedAnalysisData: """Complete enhanced analysis data combining all analysis types""" timing: TimingAnalysis = field(default_factory=TimingAnalysis) quality: QualityMetrics = field(default_factory=QualityMetrics) decoded: DecodedData = field(default_factory=DecodedData) # Legacy compatibility fields maintained for backward compatibility # (automatically synchronized with component data) ``` #### Protocol Information Models ```python @dataclass class DecodedField: """Represents a single decoded field from a protocol""" name: str value: Any field_type: FieldType # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc. description: Optional[str] = None unit: Optional[str] = None # e.g., "ms", "bytes", "ppm" confidence: float = 1.0 # 0.0 to 1.0 is_critical: bool = False @dataclass class ProtocolInfo: """Information about a detected protocol""" protocol_type: ProtocolType name: str category: ProtocolCategory # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY version: Optional[str] = None confidence: float = 1.0 port: Optional[int] = None subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" vendor: Optional[str] = None @dataclass class ProtocolDecodeResult: """Result of protocol decoding""" protocol_info: ProtocolInfo fields: List[DecodedField] = field(default_factory=list) frame_type: Optional[str] = None payload_size: int = 0 errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) ``` ## Key Processing Components ### EthernetAnalyzer Top-level orchestrator with background processing capabilities: ```python class EthernetAnalyzer: def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0): self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime) self.flow_manager = FlowManager(self.statistics_engine) self.all_packets: List[Packet] = [] self.flows = self.flow_manager.flows # Exposed for UI access self.background_analyzer = None # For large PCAP background processing ``` ### BackgroundAnalyzer Thread pool based PCAP processing for large files: ```python class BackgroundAnalyzer: def __init__(self, analyzer: EthernetAnalyzer, num_threads: int = 4, batch_size: int = 1000, progress_callback: Optional[Callable[[ParsingProgress], None]] = None, flow_update_callback: Optional[Callable[[], None]] = None): self.analyzer = analyzer self.executor = ThreadPoolExecutor(max_workers=num_threads) self.batch_size = batch_size self.progress_callback = progress_callback self.flow_update_callback = flow_update_callback self.is_running = False self._shutdown_event = threading.Event() ``` ### FlowManager Packet processing and flow creation with enhanced protocol support: ```python class FlowManager: def __init__(self, statistics_engine=None): self.flows: Dict[Tuple[str, str], FlowStats] = {} self.specialized_dissectors = { 'chapter10': Chapter10Dissector(), 'ptp': PTPDissector(), 'iena': IENADissector() } self.enhanced_ch10_decoder = EnhancedChapter10Decoder() self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin() self.protocol_registry = ProtocolRegistry() # New protocol management ``` ### ProtocolRegistry Centralized protocol information management: ```python class ProtocolRegistry: def __init__(self): self._protocols: Dict[ProtocolType, ProtocolInfo] = {} self._register_standard_protocols() # UDP, TCP, ICMP, IGMP self._register_enhanced_protocols() # CH10, PTP, IENA, NTP def get_enhanced_protocols(self) -> List[ProtocolInfo] def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo] ``` ## Enhanced Protocol Definitions ### Chapter 10 (CH10) IRIG 106 Chapter 10 data recording format: - Frame types: `CH10-Data`, `CH10-ACTTS`, `CH10-GPS`, `CH10-ACMI`, `CH10-Timing` - Enhanced timing analysis with clock drift detection - TMATS metadata parsing ### Precision Time Protocol (PTP) IEEE 1588 precision clock synchronization: - Frame types: `PTP-Sync`, `PTP-Delay_Req`, `PTP-Follow_Up` - Clock synchronization quality analysis ### IENA iNET-compatible Ethernet protocol: - Frame types: `IENA-Control`, `IENA-Data` - Packet structure validation ## TUI Display Rules ### Main Flow Row Shows: - Source/Destination IP:Port - Transport protocol (UDP/TCP) - Total packets and volume - **Full timing statistics** (ΔT, σ, outliers) if no enhanced sub-flows exist - **Basic timeline only** (duration, first/last seen) if enhanced sub-flows exist ### Sub-Rows Show (Enhanced Protocols Only): - Frame type name (e.g., `CH10-Data`) - Packet count and percentage of flow - Individual timing statistics (ΔT, σ, outliers) - Protocol-specific enhanced analysis ### Standard Protocols (UDP/TCP): - Aggregated into main flow statistics - No sub-rows created - Timing calculated across all standard packets in flow ## Processing Efficiency - **Streaming**: Packets processed one-by-one (memory efficient) - **Bucketing**: O(1) flow lookup via hash map - **Hierarchical**: Statistics calculated at multiple levels simultaneously - **Real-time**: Live updates during capture without full recalculation - **Selective display**: Only enhanced protocols get detailed sub-flow analysis - **Background processing**: Large PCAP files processed in thread pools with progressive UI updates - **Thread-safe updates**: Concurrent data access protected with locks and atomic operations - **Modular data structures**: Separated timing, quality, and decoded data for focused analysis ## Recent Architecture Enhancements ### Background Processing System - **ThreadPoolExecutor**: Multi-threaded PCAP processing for large files - **Progress callbacks**: Real-time parsing progress updates to TUI - **Flow update callbacks**: Progressive flow data updates every 50 packets - **Thread-safe shutdown**: Proper cleanup and signal handling ### Modular Data Models - **TimingAnalysis**: Dedicated timing metrics with quality classifications - **QualityMetrics**: Comprehensive error tracking and confidence scoring - **DecodedData**: Structured protocol field and channel information - **ProtocolRegistry**: Centralized protocol information management - **Legacy compatibility**: Maintains existing API while enabling new features ### Enhanced TUI Features - **Progressive loading**: TUI appears immediately while PCAP loads in background - **Real-time updates**: Flow table refreshes as new packets are processed - **Split panel layout**: Separate main flow and sub-flow detail views - **Enhanced-only sub-rows**: Standard protocols aggregate into main flow - **Timing column visibility**: Context-aware display based on sub-flow presence This architecture enables **real-time analysis of multi-protocol network traffic** with **enhanced timing analysis** for specialized protocols while maintaining **efficient memory usage**, **responsive UI updates**, and **scalable background processing** for large PCAP files. Flow: 192.168.4.89:49154 → 239.1.2.10:8400 Protocol: UDP Total Packets: 1452 Total Frame-Type Outliers: 20 === Frame Type Outlier Analysis === CH10-Data: 20 outliers Frames: 1001, 1101, 1152, 1201, 1251, 1302, 1352, 1403, 1452, 1501, 1552, 1601, 1651, 1701, 1751, 1801, 1853, 1904, 1952, 2002 Avg ΔT: 49.626 ms Std σ: 10848.285 ms 3σ Threshold: 32594.481 ms Outlier Details: Frame 1001: 51498.493 ms (4.7σ) Frame 1101: 54598.390 ms (5.0σ) Frame 1152: 54598.393 ms (5.0σ) Frame 1201: 54498.392 ms (5.0σ) Frame 2002: 53398.517 ms (4.9σ) ... and 15 more