481 lines
17 KiB
Markdown
481 lines
17 KiB
Markdown
# StreamLens Data Flow Architecture
|
||
|
||
## Overview
|
||
|
||
StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA.
|
||
|
||
## Complete Data Flow
|
||
|
||
### 1. Packet Input Layer
|
||
```python
|
||
# analyzer/analysis/core.py:36-43
|
||
def analyze_pcap(self, pcap_file: str):
|
||
packets = rdpcap(pcap_file) # Scapy reads PCAP
|
||
self._process_packets(packets) # Process each packet
|
||
```
|
||
|
||
**Sources**:
|
||
- PCAP/PCAPNG files
|
||
- Live network interfaces via Scapy
|
||
|
||
### 2. Packet Processing Engine
|
||
```python
|
||
# analyzer/analysis/core.py:65-67
|
||
def _process_packets(self, packets):
|
||
for i, packet in enumerate(packets):
|
||
self.dissector.dissect_packet(packet, i + 1)
|
||
```
|
||
|
||
**Per-packet operations**:
|
||
- Frame number assignment
|
||
- Timestamp extraction
|
||
- Initial protocol detection
|
||
|
||
### 3. Flow Management & Classification
|
||
```python
|
||
# analyzer/analysis/flow_manager.py:38-58
|
||
def process_packet(self, packet, frame_num):
|
||
# Extract network identifiers
|
||
src_ip, dst_ip = ip_layer.src, ip_layer.dst
|
||
transport_info = self._extract_transport_info(packet) # ports, protocol
|
||
|
||
# Create unique flow key
|
||
flow_key = (src_ip, dst_ip)
|
||
|
||
# Initialize new flow or use existing
|
||
if flow_key not in self.flows:
|
||
self.flows[flow_key] = FlowStats(...)
|
||
```
|
||
|
||
**Flow Bucketing**:
|
||
- Packets grouped by `(src_ip, dst_ip)` pairs
|
||
- Each flow tracked in a `FlowStats` object
|
||
- O(1) flow lookup via hash map
|
||
|
||
### 4. Protocol Detection Pipeline
|
||
```python
|
||
# analyzer/analysis/flow_manager.py:89-96
|
||
# Basic protocols (UDP, TCP, ICMP)
|
||
protocols = self._detect_basic_protocols(packet)
|
||
|
||
# Enhanced protocol detection
|
||
dissection_results = self._dissect_packet(packet, frame_num)
|
||
enhanced_protocols = self._extract_enhanced_protocols(dissection_results)
|
||
flow.detected_protocol_types.update(enhanced_protocols)
|
||
|
||
# Fallback detection
|
||
fallback_protocols = self._detect_fallback_protocols(packet, dissection_results)
|
||
```
|
||
|
||
**Protocol Hierarchy**:
|
||
1. **Basic**: Transport layer (UDP/TCP/ICMP)
|
||
2. **Enhanced**: Application layer (CH10/PTP/IENA) via specialized dissectors
|
||
3. **Fallback**: Port-based heuristics for unidentified protocols
|
||
|
||
### 5. Frame Type Classification
|
||
```python
|
||
# analyzer/analysis/flow_manager.py:98-100
|
||
frame_type = self._classify_frame_type(packet, dissection_results)
|
||
self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size)
|
||
```
|
||
|
||
**Sub-Flow Binning**:
|
||
- Each packet classified into specific frame types
|
||
- Enhanced protocols: `CH10-Data`, `CH10-ACTTS`, `PTP-Sync`, `IENA-Control`
|
||
- Standard protocols: `UDP-Data`, `TCP-Stream`
|
||
|
||
### 6. Statistical Analysis Engine
|
||
```python
|
||
# analyzer/analysis/flow_manager.py:105-112
|
||
if len(flow.timestamps) > 1:
|
||
inter_arrival = timestamp - flow.timestamps[-2]
|
||
flow.inter_arrival_times.append(inter_arrival)
|
||
|
||
# Real-time statistics if enabled
|
||
if self.statistics_engine and self.statistics_engine.enable_realtime:
|
||
self.statistics_engine.update_realtime_statistics(flow_key, flow)
|
||
```
|
||
|
||
**Timing Calculations**:
|
||
- **Flow-level**: Aggregate timing across all packets
|
||
- **Frame-type level**: Per-subtype timing in `FrameTypeStats`
|
||
- **Outlier detection**: Based on configurable sigma thresholds
|
||
- **Real-time updates**: Live statistical calculations during capture
|
||
|
||
### 7. Enhanced Protocol Analysis
|
||
```python
|
||
# analyzer/analysis/flow_manager.py:102-103
|
||
self._perform_enhanced_analysis(packet, flow, frame_num, transport_info)
|
||
```
|
||
|
||
**Specialized Decoders**:
|
||
- **Chapter 10**: Timing analysis, sequence validation, TMATS parsing
|
||
- **PTP**: Clock synchronization analysis, message type classification
|
||
- **IENA**: Packet structure validation, timing quality assessment
|
||
|
||
### 8. TUI Presentation Layer
|
||
```python
|
||
# analyzer/tui/textual/widgets/flow_table_v2.py:143-153
|
||
if self._should_show_subrows(flow):
|
||
enhanced_frame_types = self._get_enhanced_frame_types(flow)
|
||
combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types)
|
||
|
||
for extended_proto, frame_type, count, percentage in combinations:
|
||
sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}")
|
||
self.row_to_subflow_map[sub_key] = (i, frame_type)
|
||
```
|
||
|
||
**Presentation Logic**:
|
||
- **Main Rows**: Flow-level aggregates (all packets in flow)
|
||
- **Sub-Rows**: Only enhanced protocols (`CH10-*`, `PTP-*`, `IENA-*`)
|
||
- **Standard protocols**: Aggregated into main flow timing
|
||
- **Timing display**: ΔT (avg inter-arrival), σ (std deviation), outlier count
|
||
|
||
## Core Data Types
|
||
|
||
### FlowStats
|
||
Main container for flow-level statistics:
|
||
|
||
```python
|
||
@dataclass
|
||
class FlowStats:
|
||
# Network identifiers
|
||
src_ip: str
|
||
dst_ip: str
|
||
src_port: int = 0 # 0 if not applicable
|
||
dst_port: int = 0
|
||
transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc.
|
||
traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast
|
||
|
||
# Aggregate statistics
|
||
frame_count: int = 0
|
||
total_bytes: int = 0
|
||
first_seen: float = 0.0 # Timestamp of first frame
|
||
last_seen: float = 0.0 # Timestamp of last frame
|
||
duration: float = 0.0 # Flow duration in seconds
|
||
|
||
# Timing data
|
||
timestamps: List[float] = field(default_factory=list)
|
||
frame_numbers: List[int] = field(default_factory=list)
|
||
inter_arrival_times: List[float] = field(default_factory=list)
|
||
avg_inter_arrival: float = 0.0
|
||
std_inter_arrival: float = 0.0
|
||
jitter: float = 0.0
|
||
|
||
# Outlier tracking
|
||
outlier_frames: List[int] = field(default_factory=list)
|
||
outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta)
|
||
|
||
# Protocol classification
|
||
protocols: Set[str] = field(default_factory=set) # Basic protocols seen
|
||
detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocols (CH10, PTP, IENA)
|
||
|
||
# Sub-flow bins
|
||
frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict)
|
||
|
||
# Enhanced analysis results
|
||
enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData)
|
||
```
|
||
|
||
### FrameTypeStats
|
||
Statistics for specific frame types within a flow:
|
||
|
||
```python
|
||
@dataclass
|
||
class FrameTypeStats:
|
||
# Frame type identifier
|
||
frame_type: str # e.g., "CH10-Data", "PTP-Sync", "UDP-Data"
|
||
|
||
# Basic statistics
|
||
count: int = 0
|
||
total_bytes: int = 0
|
||
|
||
# Timing data
|
||
timestamps: List[float] = field(default_factory=list)
|
||
frame_numbers: List[int] = field(default_factory=list)
|
||
inter_arrival_times: List[float] = field(default_factory=list)
|
||
avg_inter_arrival: float = 0.0
|
||
std_inter_arrival: float = 0.0
|
||
|
||
# Outlier tracking
|
||
outlier_frames: List[int] = field(default_factory=list)
|
||
outlier_details: List[Tuple[int, float]] = field(default_factory=list)
|
||
```
|
||
|
||
### Enhanced Analysis Data Models
|
||
|
||
#### New Modular Structure
|
||
The enhanced analysis data has been restructured into focused components for better organization and extensibility:
|
||
|
||
```python
|
||
@dataclass
|
||
class TimingAnalysis:
|
||
"""Timing analysis results for enhanced protocols"""
|
||
avg_clock_drift_ppm: float = 0.0
|
||
max_clock_drift_ppm: float = 0.0
|
||
min_clock_drift_ppm: float = 0.0
|
||
drift_variance: float = 0.0
|
||
|
||
quality: TimingQuality = TimingQuality.UNKNOWN # EXCELLENT, GOOD, MODERATE, POOR
|
||
stability: TimingStability = TimingStability.UNKNOWN # STABLE, VARIABLE, UNSTABLE
|
||
|
||
timing_accuracy_percent: float = 0.0
|
||
sync_errors: int = 0
|
||
timing_anomalies: int = 0
|
||
anomaly_rate_percent: float = 0.0
|
||
|
||
has_internal_timing: bool = False
|
||
rtc_sync_available: bool = False
|
||
|
||
@dataclass
|
||
class QualityMetrics:
|
||
"""Quality metrics for enhanced protocol data"""
|
||
avg_frame_quality_percent: float = 0.0
|
||
frame_quality_samples: List[float] = field(default_factory=list)
|
||
|
||
avg_signal_quality_percent: float = 0.0
|
||
signal_quality_samples: List[float] = field(default_factory=list)
|
||
|
||
sequence_gaps: int = 0
|
||
format_errors: int = 0
|
||
overflow_errors: int = 0
|
||
checksum_errors: int = 0
|
||
|
||
avg_confidence_score: float = 0.0
|
||
confidence_samples: List[float] = field(default_factory=list)
|
||
low_confidence_frames: int = 0
|
||
|
||
corrupted_frames: int = 0
|
||
missing_frames: int = 0
|
||
duplicate_frames: int = 0
|
||
|
||
@dataclass
|
||
class DecodedData:
|
||
"""Container for decoded protocol data"""
|
||
channel_count: int = 0
|
||
analog_channels: int = 0
|
||
pcm_channels: int = 0
|
||
discrete_channels: int = 0
|
||
|
||
primary_data_type: DataType = DataType.UNKNOWN # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS
|
||
secondary_data_types: Set[DataType] = field(default_factory=set)
|
||
|
||
sample_decoded_fields: Dict[str, Any] = field(default_factory=dict)
|
||
available_field_names: List[str] = field(default_factory=list)
|
||
field_count: int = 0
|
||
critical_fields: List[str] = field(default_factory=list)
|
||
|
||
frame_types: Set[str] = field(default_factory=set)
|
||
frame_type_distribution: Dict[str, int] = field(default_factory=dict)
|
||
|
||
tmats_frames: int = 0
|
||
setup_frames: int = 0
|
||
data_frames: int = 0
|
||
|
||
decoder_type: str = "Standard"
|
||
decoder_version: Optional[str] = None
|
||
decode_success_rate: float = 1.0
|
||
|
||
@dataclass
|
||
class EnhancedAnalysisData:
|
||
"""Complete enhanced analysis data combining all analysis types"""
|
||
timing: TimingAnalysis = field(default_factory=TimingAnalysis)
|
||
quality: QualityMetrics = field(default_factory=QualityMetrics)
|
||
decoded: DecodedData = field(default_factory=DecodedData)
|
||
|
||
# Legacy compatibility fields maintained for backward compatibility
|
||
# (automatically synchronized with component data)
|
||
```
|
||
|
||
#### Protocol Information Models
|
||
|
||
```python
|
||
@dataclass
|
||
class DecodedField:
|
||
"""Represents a single decoded field from a protocol"""
|
||
name: str
|
||
value: Any
|
||
field_type: FieldType # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc.
|
||
description: Optional[str] = None
|
||
unit: Optional[str] = None # e.g., "ms", "bytes", "ppm"
|
||
confidence: float = 1.0 # 0.0 to 1.0
|
||
is_critical: bool = False
|
||
|
||
@dataclass
|
||
class ProtocolInfo:
|
||
"""Information about a detected protocol"""
|
||
protocol_type: ProtocolType
|
||
name: str
|
||
category: ProtocolCategory # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY
|
||
version: Optional[str] = None
|
||
confidence: float = 1.0
|
||
|
||
port: Optional[int] = None
|
||
subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync"
|
||
vendor: Optional[str] = None
|
||
|
||
@dataclass
|
||
class ProtocolDecodeResult:
|
||
"""Result of protocol decoding"""
|
||
protocol_info: ProtocolInfo
|
||
fields: List[DecodedField] = field(default_factory=list)
|
||
frame_type: Optional[str] = None
|
||
payload_size: int = 0
|
||
errors: List[str] = field(default_factory=list)
|
||
warnings: List[str] = field(default_factory=list)
|
||
```
|
||
|
||
## Key Processing Components
|
||
|
||
### EthernetAnalyzer
|
||
Top-level orchestrator with background processing capabilities:
|
||
```python
|
||
class EthernetAnalyzer:
|
||
def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0):
|
||
self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime)
|
||
self.flow_manager = FlowManager(self.statistics_engine)
|
||
self.all_packets: List[Packet] = []
|
||
self.flows = self.flow_manager.flows # Exposed for UI access
|
||
self.background_analyzer = None # For large PCAP background processing
|
||
```
|
||
|
||
### BackgroundAnalyzer
|
||
Thread pool based PCAP processing for large files:
|
||
```python
|
||
class BackgroundAnalyzer:
|
||
def __init__(self, analyzer: EthernetAnalyzer,
|
||
num_threads: int = 4,
|
||
batch_size: int = 1000,
|
||
progress_callback: Optional[Callable[[ParsingProgress], None]] = None,
|
||
flow_update_callback: Optional[Callable[[], None]] = None):
|
||
self.analyzer = analyzer
|
||
self.executor = ThreadPoolExecutor(max_workers=num_threads)
|
||
self.batch_size = batch_size
|
||
self.progress_callback = progress_callback
|
||
self.flow_update_callback = flow_update_callback
|
||
self.is_running = False
|
||
self._shutdown_event = threading.Event()
|
||
```
|
||
|
||
### FlowManager
|
||
Packet processing and flow creation with enhanced protocol support:
|
||
```python
|
||
class FlowManager:
|
||
def __init__(self, statistics_engine=None):
|
||
self.flows: Dict[Tuple[str, str], FlowStats] = {}
|
||
self.specialized_dissectors = {
|
||
'chapter10': Chapter10Dissector(),
|
||
'ptp': PTPDissector(),
|
||
'iena': IENADissector()
|
||
}
|
||
self.enhanced_ch10_decoder = EnhancedChapter10Decoder()
|
||
self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin()
|
||
self.protocol_registry = ProtocolRegistry() # New protocol management
|
||
```
|
||
|
||
### ProtocolRegistry
|
||
Centralized protocol information management:
|
||
```python
|
||
class ProtocolRegistry:
|
||
def __init__(self):
|
||
self._protocols: Dict[ProtocolType, ProtocolInfo] = {}
|
||
self._register_standard_protocols() # UDP, TCP, ICMP, IGMP
|
||
self._register_enhanced_protocols() # CH10, PTP, IENA, NTP
|
||
|
||
def get_enhanced_protocols(self) -> List[ProtocolInfo]
|
||
def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool
|
||
def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]
|
||
```
|
||
|
||
## Enhanced Protocol Definitions
|
||
|
||
### Chapter 10 (CH10)
|
||
IRIG 106 Chapter 10 data recording format:
|
||
- Frame types: `CH10-Data`, `CH10-ACTTS`, `CH10-GPS`, `CH10-ACMI`, `CH10-Timing`
|
||
- Enhanced timing analysis with clock drift detection
|
||
- TMATS metadata parsing
|
||
|
||
### Precision Time Protocol (PTP)
|
||
IEEE 1588 precision clock synchronization:
|
||
- Frame types: `PTP-Sync`, `PTP-Delay_Req`, `PTP-Follow_Up`
|
||
- Clock synchronization quality analysis
|
||
|
||
### IENA
|
||
iNET-compatible Ethernet protocol:
|
||
- Frame types: `IENA-Control`, `IENA-Data`
|
||
- Packet structure validation
|
||
|
||
## TUI Display Rules
|
||
|
||
### Main Flow Row Shows:
|
||
- Source/Destination IP:Port
|
||
- Transport protocol (UDP/TCP)
|
||
- Total packets and volume
|
||
- **Full timing statistics** (ΔT, σ, outliers) if no enhanced sub-flows exist
|
||
- **Basic timeline only** (duration, first/last seen) if enhanced sub-flows exist
|
||
|
||
### Sub-Rows Show (Enhanced Protocols Only):
|
||
- Frame type name (e.g., `CH10-Data`)
|
||
- Packet count and percentage of flow
|
||
- Individual timing statistics (ΔT, σ, outliers)
|
||
- Protocol-specific enhanced analysis
|
||
|
||
### Standard Protocols (UDP/TCP):
|
||
- Aggregated into main flow statistics
|
||
- No sub-rows created
|
||
- Timing calculated across all standard packets in flow
|
||
|
||
## Processing Efficiency
|
||
|
||
- **Streaming**: Packets processed one-by-one (memory efficient)
|
||
- **Bucketing**: O(1) flow lookup via hash map
|
||
- **Hierarchical**: Statistics calculated at multiple levels simultaneously
|
||
- **Real-time**: Live updates during capture without full recalculation
|
||
- **Selective display**: Only enhanced protocols get detailed sub-flow analysis
|
||
- **Background processing**: Large PCAP files processed in thread pools with progressive UI updates
|
||
- **Thread-safe updates**: Concurrent data access protected with locks and atomic operations
|
||
- **Modular data structures**: Separated timing, quality, and decoded data for focused analysis
|
||
|
||
## Recent Architecture Enhancements
|
||
|
||
### Background Processing System
|
||
- **ThreadPoolExecutor**: Multi-threaded PCAP processing for large files
|
||
- **Progress callbacks**: Real-time parsing progress updates to TUI
|
||
- **Flow update callbacks**: Progressive flow data updates every 50 packets
|
||
- **Thread-safe shutdown**: Proper cleanup and signal handling
|
||
|
||
### Modular Data Models
|
||
- **TimingAnalysis**: Dedicated timing metrics with quality classifications
|
||
- **QualityMetrics**: Comprehensive error tracking and confidence scoring
|
||
- **DecodedData**: Structured protocol field and channel information
|
||
- **ProtocolRegistry**: Centralized protocol information management
|
||
- **Legacy compatibility**: Maintains existing API while enabling new features
|
||
|
||
### Enhanced TUI Features
|
||
- **Progressive loading**: TUI appears immediately while PCAP loads in background
|
||
- **Real-time updates**: Flow table refreshes as new packets are processed
|
||
- **Split panel layout**: Separate main flow and sub-flow detail views
|
||
- **Enhanced-only sub-rows**: Standard protocols aggregate into main flow
|
||
- **Timing column visibility**: Context-aware display based on sub-flow presence
|
||
|
||
This architecture enables **real-time analysis of multi-protocol network traffic** with **enhanced timing analysis** for specialized protocols while maintaining **efficient memory usage**, **responsive UI updates**, and **scalable background processing** for large PCAP files.
|
||
|
||
Flow: 192.168.4.89:49154 → 239.1.2.10:8400
|
||
Protocol: UDP
|
||
Total Packets: 1452
|
||
Total Frame-Type Outliers: 20
|
||
|
||
=== Frame Type Outlier Analysis ===
|
||
|
||
CH10-Data: 20 outliers
|
||
Frames: 1001, 1101, 1152, 1201, 1251, 1302, 1352, 1403, 1452, 1501, 1552, 1601, 1651, 1701, 1751, 1801, 1853, 1904, 1952, 2002
|
||
Avg ΔT: 49.626 ms
|
||
Std σ: 10848.285 ms
|
||
3σ Threshold: 32594.481 ms
|
||
Outlier Details:
|
||
Frame 1001: 51498.493 ms (4.7σ)
|
||
Frame 1101: 54598.390 ms (5.0σ)
|
||
Frame 1152: 54598.393 ms (5.0σ)
|
||
Frame 1201: 54498.392 ms (5.0σ)
|
||
Frame 2002: 53398.517 ms (4.9σ)
|
||
... and 15 more |