2025-07-28 18:28:26 -04:00
# StreamLens Data Flow Architecture
## Overview
StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA.
## Complete Data Flow
### 1. Packet Input Layer
```python
# analyzer/analysis/core.py:36-43
def analyze_pcap(self, pcap_file: str):
packets = rdpcap(pcap_file) # Scapy reads PCAP
self._process_packets(packets) # Process each packet
```
**Sources**:
- PCAP/PCAPNG files
- Live network interfaces via Scapy
### 2. Packet Processing Engine
```python
# analyzer/analysis/core.py:65-67
def _process_packets(self, packets):
for i, packet in enumerate(packets):
self.dissector.dissect_packet(packet, i + 1)
```
**Per-packet operations**:
- Frame number assignment
- Timestamp extraction
- Initial protocol detection
### 3. Flow Management & Classification
```python
# analyzer/analysis/flow_manager.py:38-58
def process_packet(self, packet, frame_num):
# Extract network identifiers
src_ip, dst_ip = ip_layer.src, ip_layer.dst
transport_info = self._extract_transport_info(packet) # ports, protocol
# Create unique flow key
flow_key = (src_ip, dst_ip)
# Initialize new flow or use existing
if flow_key not in self.flows:
self.flows[flow_key] = FlowStats(...)
```
**Flow Bucketing**:
- Packets grouped by `(src_ip, dst_ip)` pairs
- Each flow tracked in a `FlowStats` object
- O(1) flow lookup via hash map
### 4. Protocol Detection Pipeline
```python
# analyzer/analysis/flow_manager.py:89-96
# Basic protocols (UDP, TCP, ICMP)
protocols = self._detect_basic_protocols(packet)
# Enhanced protocol detection
dissection_results = self._dissect_packet(packet, frame_num)
enhanced_protocols = self._extract_enhanced_protocols(dissection_results)
flow.detected_protocol_types.update(enhanced_protocols)
# Fallback detection
fallback_protocols = self._detect_fallback_protocols(packet, dissection_results)
```
**Protocol Hierarchy**:
1. **Basic** : Transport layer (UDP/TCP/ICMP)
2. **Enhanced** : Application layer (CH10/PTP/IENA) via specialized dissectors
3. **Fallback** : Port-based heuristics for unidentified protocols
### 5. Frame Type Classification
```python
# analyzer/analysis/flow_manager.py:98-100
frame_type = self._classify_frame_type(packet, dissection_results)
self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size)
```
**Sub-Flow Binning**:
- Each packet classified into specific frame types
- Enhanced protocols: `CH10-Data` , `CH10-ACTTS` , `PTP-Sync` , `IENA-Control`
- Standard protocols: `UDP-Data` , `TCP-Stream`
### 6. Statistical Analysis Engine
```python
# analyzer/analysis/flow_manager.py:105-112
if len(flow.timestamps) > 1:
inter_arrival = timestamp - flow.timestamps[-2]
flow.inter_arrival_times.append(inter_arrival)
# Real-time statistics if enabled
if self.statistics_engine and self.statistics_engine.enable_realtime:
self.statistics_engine.update_realtime_statistics(flow_key, flow)
```
**Timing Calculations**:
- **Flow-level**: Aggregate timing across all packets
- **Frame-type level**: Per-subtype timing in `FrameTypeStats`
- **Outlier detection**: Based on configurable sigma thresholds
- **Real-time updates**: Live statistical calculations during capture
### 7. Enhanced Protocol Analysis
```python
# analyzer/analysis/flow_manager.py:102-103
self._perform_enhanced_analysis(packet, flow, frame_num, transport_info)
```
**Specialized Decoders**:
- **Chapter 10**: Timing analysis, sequence validation, TMATS parsing
- **PTP**: Clock synchronization analysis, message type classification
- **IENA**: Packet structure validation, timing quality assessment
### 8. TUI Presentation Layer
```python
# analyzer/tui/textual/widgets/flow_table_v2.py:143-153
if self._should_show_subrows(flow):
enhanced_frame_types = self._get_enhanced_frame_types(flow)
combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types)
for extended_proto, frame_type, count, percentage in combinations:
sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}")
self.row_to_subflow_map[sub_key] = (i, frame_type)
```
**Presentation Logic**:
- **Main Rows**: Flow-level aggregates (all packets in flow)
- **Sub-Rows**: Only enhanced protocols (`CH10-*` , `PTP-*` , `IENA-*` )
- **Standard protocols**: Aggregated into main flow timing
- **Timing display**: ΔT (avg inter-arrival), σ (std deviation), outlier count
## Core Data Types
### FlowStats
Main container for flow-level statistics:
```python
@dataclass
class FlowStats:
# Network identifiers
src_ip: str
dst_ip: str
src_port: int = 0 # 0 if not applicable
dst_port: int = 0
transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc.
traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast
# Aggregate statistics
frame_count: int = 0
total_bytes: int = 0
first_seen: float = 0.0 # Timestamp of first frame
last_seen: float = 0.0 # Timestamp of last frame
duration: float = 0.0 # Flow duration in seconds
# Timing data
timestamps: List[float] = field(default_factory=list)
frame_numbers: List[int] = field(default_factory=list)
inter_arrival_times: List[float] = field(default_factory=list)
avg_inter_arrival: float = 0.0
std_inter_arrival: float = 0.0
jitter: float = 0.0
# Outlier tracking
outlier_frames: List[int] = field(default_factory=list)
outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta)
# Protocol classification
protocols: Set[str] = field(default_factory=set) # Basic protocols seen
detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocols (CH10, PTP, IENA)
# Sub-flow bins
frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict)
# Enhanced analysis results
enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData)
```
### FrameTypeStats
Statistics for specific frame types within a flow:
```python
@dataclass
class FrameTypeStats:
# Frame type identifier
frame_type: str # e.g., "CH10-Data", "PTP-Sync", "UDP-Data"
# Basic statistics
count: int = 0
total_bytes: int = 0
# Timing data
timestamps: List[float] = field(default_factory=list)
frame_numbers: List[int] = field(default_factory=list)
inter_arrival_times: List[float] = field(default_factory=list)
avg_inter_arrival: float = 0.0
std_inter_arrival: float = 0.0
# Outlier tracking
outlier_frames: List[int] = field(default_factory=list)
outlier_details: List[Tuple[int, float]] = field(default_factory=list)
```
### Enhanced Analysis Data Models
#### New Modular Structure
The enhanced analysis data has been restructured into focused components for better organization and extensibility:
```python
@dataclass
class TimingAnalysis:
"""Timing analysis results for enhanced protocols"""
avg_clock_drift_ppm: float = 0.0
max_clock_drift_ppm: float = 0.0
min_clock_drift_ppm: float = 0.0
drift_variance: float = 0.0
quality: TimingQuality = TimingQuality.UNKNOWN # EXCELLENT, GOOD, MODERATE, POOR
stability: TimingStability = TimingStability.UNKNOWN # STABLE, VARIABLE, UNSTABLE
timing_accuracy_percent: float = 0.0
sync_errors: int = 0
timing_anomalies: int = 0
anomaly_rate_percent: float = 0.0
has_internal_timing: bool = False
rtc_sync_available: bool = False
@dataclass
class QualityMetrics:
"""Quality metrics for enhanced protocol data"""
avg_frame_quality_percent: float = 0.0
frame_quality_samples: List[float] = field(default_factory=list)
avg_signal_quality_percent: float = 0.0
signal_quality_samples: List[float] = field(default_factory=list)
sequence_gaps: int = 0
format_errors: int = 0
overflow_errors: int = 0
checksum_errors: int = 0
avg_confidence_score: float = 0.0
confidence_samples: List[float] = field(default_factory=list)
low_confidence_frames: int = 0
corrupted_frames: int = 0
missing_frames: int = 0
duplicate_frames: int = 0
@dataclass
class DecodedData:
"""Container for decoded protocol data"""
channel_count: int = 0
analog_channels: int = 0
pcm_channels: int = 0
discrete_channels: int = 0
primary_data_type: DataType = DataType.UNKNOWN # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS
secondary_data_types: Set[DataType] = field(default_factory=set)
sample_decoded_fields: Dict[str, Any] = field(default_factory=dict)
available_field_names: List[str] = field(default_factory=list)
field_count: int = 0
critical_fields: List[str] = field(default_factory=list)
frame_types: Set[str] = field(default_factory=set)
frame_type_distribution: Dict[str, int] = field(default_factory=dict)
tmats_frames: int = 0
setup_frames: int = 0
data_frames: int = 0
decoder_type: str = "Standard"
decoder_version: Optional[str] = None
decode_success_rate: float = 1.0
@dataclass
class EnhancedAnalysisData:
"""Complete enhanced analysis data combining all analysis types"""
timing: TimingAnalysis = field(default_factory=TimingAnalysis)
quality: QualityMetrics = field(default_factory=QualityMetrics)
decoded: DecodedData = field(default_factory=DecodedData)
# Legacy compatibility fields maintained for backward compatibility
# (automatically synchronized with component data)
```
#### Protocol Information Models
```python
@dataclass
class DecodedField:
"""Represents a single decoded field from a protocol"""
name: str
value: Any
field_type: FieldType # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc.
description: Optional[str] = None
unit: Optional[str] = None # e.g., "ms", "bytes", "ppm"
confidence: float = 1.0 # 0.0 to 1.0
is_critical: bool = False
@dataclass
class ProtocolInfo:
"""Information about a detected protocol"""
protocol_type: ProtocolType
name: str
category: ProtocolCategory # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY
version: Optional[str] = None
confidence: float = 1.0
port: Optional[int] = None
subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync"
vendor: Optional[str] = None
@dataclass
class ProtocolDecodeResult:
"""Result of protocol decoding"""
protocol_info: ProtocolInfo
fields: List[DecodedField] = field(default_factory=list)
frame_type: Optional[str] = None
payload_size: int = 0
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
```
## Key Processing Components
### EthernetAnalyzer
Top-level orchestrator with background processing capabilities:
```python
class EthernetAnalyzer:
def __init__ (self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0):
self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime)
self.flow_manager = FlowManager(self.statistics_engine)
self.all_packets: List[Packet] = []
self.flows = self.flow_manager.flows # Exposed for UI access
self.background_analyzer = None # For large PCAP background processing
```
### BackgroundAnalyzer
Thread pool based PCAP processing for large files:
```python
class BackgroundAnalyzer:
def __init__ (self, analyzer: EthernetAnalyzer,
num_threads: int = 4,
batch_size: int = 1000,
progress_callback: Optional[Callable[[ParsingProgress], None]] = None,
flow_update_callback: Optional[Callable[[], None]] = None):
self.analyzer = analyzer
self.executor = ThreadPoolExecutor(max_workers=num_threads)
self.batch_size = batch_size
self.progress_callback = progress_callback
self.flow_update_callback = flow_update_callback
self.is_running = False
self._shutdown_event = threading.Event()
```
### FlowManager
Packet processing and flow creation with enhanced protocol support:
```python
class FlowManager:
def __init__ (self, statistics_engine=None):
self.flows: Dict[Tuple[str, str], FlowStats] = {}
self.specialized_dissectors = {
'chapter10': Chapter10Dissector(),
'ptp': PTPDissector(),
'iena': IENADissector()
}
self.enhanced_ch10_decoder = EnhancedChapter10Decoder()
self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin()
self.protocol_registry = ProtocolRegistry() # New protocol management
```
### ProtocolRegistry
Centralized protocol information management:
```python
class ProtocolRegistry:
def __init__ (self):
self._protocols: Dict[ProtocolType, ProtocolInfo] = {}
self._register_standard_protocols() # UDP, TCP, ICMP, IGMP
self._register_enhanced_protocols() # CH10, PTP, IENA, NTP
def get_enhanced_protocols(self) -> List[ProtocolInfo]
def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool
def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]
```
## Enhanced Protocol Definitions
### Chapter 10 (CH10)
IRIG 106 Chapter 10 data recording format:
- Frame types: `CH10-Data` , `CH10-ACTTS` , `CH10-GPS` , `CH10-ACMI` , `CH10-Timing`
- Enhanced timing analysis with clock drift detection
- TMATS metadata parsing
### Precision Time Protocol (PTP)
IEEE 1588 precision clock synchronization:
- Frame types: `PTP-Sync` , `PTP-Delay_Req` , `PTP-Follow_Up`
- Clock synchronization quality analysis
### IENA
iNET-compatible Ethernet protocol:
- Frame types: `IENA-Control` , `IENA-Data`
- Packet structure validation
## TUI Display Rules
### Main Flow Row Shows:
- Source/Destination IP:Port
- Transport protocol (UDP/TCP)
- Total packets and volume
- **Full timing statistics** (ΔT, σ , outliers) if no enhanced sub-flows exist
- **Basic timeline only** (duration, first/last seen) if enhanced sub-flows exist
### Sub-Rows Show (Enhanced Protocols Only):
- Frame type name (e.g., `CH10-Data` )
- Packet count and percentage of flow
- Individual timing statistics (ΔT, σ , outliers)
- Protocol-specific enhanced analysis
### Standard Protocols (UDP/TCP):
- Aggregated into main flow statistics
- No sub-rows created
- Timing calculated across all standard packets in flow
## Processing Efficiency
- **Streaming**: Packets processed one-by-one (memory efficient)
- **Bucketing**: O(1) flow lookup via hash map
- **Hierarchical**: Statistics calculated at multiple levels simultaneously
- **Real-time**: Live updates during capture without full recalculation
- **Selective display**: Only enhanced protocols get detailed sub-flow analysis
- **Background processing**: Large PCAP files processed in thread pools with progressive UI updates
- **Thread-safe updates**: Concurrent data access protected with locks and atomic operations
- **Modular data structures**: Separated timing, quality, and decoded data for focused analysis
## Recent Architecture Enhancements
### Background Processing System
- **ThreadPoolExecutor**: Multi-threaded PCAP processing for large files
- **Progress callbacks**: Real-time parsing progress updates to TUI
- **Flow update callbacks**: Progressive flow data updates every 50 packets
- **Thread-safe shutdown**: Proper cleanup and signal handling
### Modular Data Models
- **TimingAnalysis**: Dedicated timing metrics with quality classifications
- **QualityMetrics**: Comprehensive error tracking and confidence scoring
- **DecodedData**: Structured protocol field and channel information
- **ProtocolRegistry**: Centralized protocol information management
- **Legacy compatibility**: Maintains existing API while enabling new features
### Enhanced TUI Features
- **Progressive loading**: TUI appears immediately while PCAP loads in background
- **Real-time updates**: Flow table refreshes as new packets are processed
- **Split panel layout**: Separate main flow and sub-flow detail views
- **Enhanced-only sub-rows**: Standard protocols aggregate into main flow
- **Timing column visibility**: Context-aware display based on sub-flow presence
2025-07-30 23:48:32 -04:00
This architecture enables **real-time analysis of multi-protocol network traffic** with **enhanced timing analysis** for specialized protocols while maintaining **efficient memory usage** , **responsive UI updates** , and **scalable background processing** for large PCAP files.
Flow: 192.168.4.89:49154 → 239.1.2.10:8400
Protocol: UDP
Total Packets: 1452
Total Frame-Type Outliers: 20
=== Frame Type Outlier Analysis ===
CH10-Data: 20 outliers
Frames: 1001, 1101, 1152, 1201, 1251, 1302, 1352, 1403, 1452, 1501, 1552, 1601, 1651, 1701, 1751, 1801, 1853, 1904, 1952, 2002
Avg ΔT: 49.626 ms
Std σ : 10848.285 ms
3σ Threshold: 32594.481 ms
Outlier Details:
Frame 1001: 51498.493 ms (4.7σ )
Frame 1101: 54598.390 ms (5.0σ )
Frame 1152: 54598.393 ms (5.0σ )
Frame 1201: 54498.392 ms (5.0σ )
Frame 2002: 53398.517 ms (4.9σ )
... and 15 more