17 KiB
StreamLens Data Flow Architecture
Overview
StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA.
Complete Data Flow
1. Packet Input Layer
# analyzer/analysis/core.py:36-43
def analyze_pcap(self, pcap_file: str):
packets = rdpcap(pcap_file) # Scapy reads PCAP
self._process_packets(packets) # Process each packet
Sources:
- PCAP/PCAPNG files
- Live network interfaces via Scapy
2. Packet Processing Engine
# analyzer/analysis/core.py:65-67
def _process_packets(self, packets):
for i, packet in enumerate(packets):
self.dissector.dissect_packet(packet, i + 1)
Per-packet operations:
- Frame number assignment
- Timestamp extraction
- Initial protocol detection
3. Flow Management & Classification
# analyzer/analysis/flow_manager.py:38-58
def process_packet(self, packet, frame_num):
# Extract network identifiers
src_ip, dst_ip = ip_layer.src, ip_layer.dst
transport_info = self._extract_transport_info(packet) # ports, protocol
# Create unique flow key
flow_key = (src_ip, dst_ip)
# Initialize new flow or use existing
if flow_key not in self.flows:
self.flows[flow_key] = FlowStats(...)
Flow Bucketing:
- Packets grouped by
(src_ip, dst_ip)pairs - Each flow tracked in a
FlowStatsobject - O(1) flow lookup via hash map
4. Protocol Detection Pipeline
# analyzer/analysis/flow_manager.py:89-96
# Basic protocols (UDP, TCP, ICMP)
protocols = self._detect_basic_protocols(packet)
# Enhanced protocol detection
dissection_results = self._dissect_packet(packet, frame_num)
enhanced_protocols = self._extract_enhanced_protocols(dissection_results)
flow.detected_protocol_types.update(enhanced_protocols)
# Fallback detection
fallback_protocols = self._detect_fallback_protocols(packet, dissection_results)
Protocol Hierarchy:
- Basic: Transport layer (UDP/TCP/ICMP)
- Enhanced: Application layer (CH10/PTP/IENA) via specialized dissectors
- Fallback: Port-based heuristics for unidentified protocols
5. Frame Type Classification
# analyzer/analysis/flow_manager.py:98-100
frame_type = self._classify_frame_type(packet, dissection_results)
self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size)
Sub-Flow Binning:
- Each packet classified into specific frame types
- Enhanced protocols:
CH10-Data,CH10-ACTTS,PTP-Sync,IENA-Control - Standard protocols:
UDP-Data,TCP-Stream
6. Statistical Analysis Engine
# analyzer/analysis/flow_manager.py:105-112
if len(flow.timestamps) > 1:
inter_arrival = timestamp - flow.timestamps[-2]
flow.inter_arrival_times.append(inter_arrival)
# Real-time statistics if enabled
if self.statistics_engine and self.statistics_engine.enable_realtime:
self.statistics_engine.update_realtime_statistics(flow_key, flow)
Timing Calculations:
- Flow-level: Aggregate timing across all packets
- Frame-type level: Per-subtype timing in
FrameTypeStats - Outlier detection: Based on configurable sigma thresholds
- Real-time updates: Live statistical calculations during capture
7. Enhanced Protocol Analysis
# analyzer/analysis/flow_manager.py:102-103
self._perform_enhanced_analysis(packet, flow, frame_num, transport_info)
Specialized Decoders:
- Chapter 10: Timing analysis, sequence validation, TMATS parsing
- PTP: Clock synchronization analysis, message type classification
- IENA: Packet structure validation, timing quality assessment
8. TUI Presentation Layer
# analyzer/tui/textual/widgets/flow_table_v2.py:143-153
if self._should_show_subrows(flow):
enhanced_frame_types = self._get_enhanced_frame_types(flow)
combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types)
for extended_proto, frame_type, count, percentage in combinations:
sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}")
self.row_to_subflow_map[sub_key] = (i, frame_type)
Presentation Logic:
- Main Rows: Flow-level aggregates (all packets in flow)
- Sub-Rows: Only enhanced protocols (
CH10-*,PTP-*,IENA-*) - Standard protocols: Aggregated into main flow timing
- Timing display: ΔT (avg inter-arrival), σ (std deviation), outlier count
Core Data Types
FlowStats
Main container for flow-level statistics:
@dataclass
class FlowStats:
# Network identifiers
src_ip: str
dst_ip: str
src_port: int = 0 # 0 if not applicable
dst_port: int = 0
transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc.
traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast
# Aggregate statistics
frame_count: int = 0
total_bytes: int = 0
first_seen: float = 0.0 # Timestamp of first frame
last_seen: float = 0.0 # Timestamp of last frame
duration: float = 0.0 # Flow duration in seconds
# Timing data
timestamps: List[float] = field(default_factory=list)
frame_numbers: List[int] = field(default_factory=list)
inter_arrival_times: List[float] = field(default_factory=list)
avg_inter_arrival: float = 0.0
std_inter_arrival: float = 0.0
jitter: float = 0.0
# Outlier tracking
outlier_frames: List[int] = field(default_factory=list)
outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta)
# Protocol classification
protocols: Set[str] = field(default_factory=set) # Basic protocols seen
detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocols (CH10, PTP, IENA)
# Sub-flow bins
frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict)
# Enhanced analysis results
enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData)
FrameTypeStats
Statistics for specific frame types within a flow:
@dataclass
class FrameTypeStats:
# Frame type identifier
frame_type: str # e.g., "CH10-Data", "PTP-Sync", "UDP-Data"
# Basic statistics
count: int = 0
total_bytes: int = 0
# Timing data
timestamps: List[float] = field(default_factory=list)
frame_numbers: List[int] = field(default_factory=list)
inter_arrival_times: List[float] = field(default_factory=list)
avg_inter_arrival: float = 0.0
std_inter_arrival: float = 0.0
# Outlier tracking
outlier_frames: List[int] = field(default_factory=list)
outlier_details: List[Tuple[int, float]] = field(default_factory=list)
Enhanced Analysis Data Models
New Modular Structure
The enhanced analysis data has been restructured into focused components for better organization and extensibility:
@dataclass
class TimingAnalysis:
"""Timing analysis results for enhanced protocols"""
avg_clock_drift_ppm: float = 0.0
max_clock_drift_ppm: float = 0.0
min_clock_drift_ppm: float = 0.0
drift_variance: float = 0.0
quality: TimingQuality = TimingQuality.UNKNOWN # EXCELLENT, GOOD, MODERATE, POOR
stability: TimingStability = TimingStability.UNKNOWN # STABLE, VARIABLE, UNSTABLE
timing_accuracy_percent: float = 0.0
sync_errors: int = 0
timing_anomalies: int = 0
anomaly_rate_percent: float = 0.0
has_internal_timing: bool = False
rtc_sync_available: bool = False
@dataclass
class QualityMetrics:
"""Quality metrics for enhanced protocol data"""
avg_frame_quality_percent: float = 0.0
frame_quality_samples: List[float] = field(default_factory=list)
avg_signal_quality_percent: float = 0.0
signal_quality_samples: List[float] = field(default_factory=list)
sequence_gaps: int = 0
format_errors: int = 0
overflow_errors: int = 0
checksum_errors: int = 0
avg_confidence_score: float = 0.0
confidence_samples: List[float] = field(default_factory=list)
low_confidence_frames: int = 0
corrupted_frames: int = 0
missing_frames: int = 0
duplicate_frames: int = 0
@dataclass
class DecodedData:
"""Container for decoded protocol data"""
channel_count: int = 0
analog_channels: int = 0
pcm_channels: int = 0
discrete_channels: int = 0
primary_data_type: DataType = DataType.UNKNOWN # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS
secondary_data_types: Set[DataType] = field(default_factory=set)
sample_decoded_fields: Dict[str, Any] = field(default_factory=dict)
available_field_names: List[str] = field(default_factory=list)
field_count: int = 0
critical_fields: List[str] = field(default_factory=list)
frame_types: Set[str] = field(default_factory=set)
frame_type_distribution: Dict[str, int] = field(default_factory=dict)
tmats_frames: int = 0
setup_frames: int = 0
data_frames: int = 0
decoder_type: str = "Standard"
decoder_version: Optional[str] = None
decode_success_rate: float = 1.0
@dataclass
class EnhancedAnalysisData:
"""Complete enhanced analysis data combining all analysis types"""
timing: TimingAnalysis = field(default_factory=TimingAnalysis)
quality: QualityMetrics = field(default_factory=QualityMetrics)
decoded: DecodedData = field(default_factory=DecodedData)
# Legacy compatibility fields maintained for backward compatibility
# (automatically synchronized with component data)
Protocol Information Models
@dataclass
class DecodedField:
"""Represents a single decoded field from a protocol"""
name: str
value: Any
field_type: FieldType # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc.
description: Optional[str] = None
unit: Optional[str] = None # e.g., "ms", "bytes", "ppm"
confidence: float = 1.0 # 0.0 to 1.0
is_critical: bool = False
@dataclass
class ProtocolInfo:
"""Information about a detected protocol"""
protocol_type: ProtocolType
name: str
category: ProtocolCategory # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY
version: Optional[str] = None
confidence: float = 1.0
port: Optional[int] = None
subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync"
vendor: Optional[str] = None
@dataclass
class ProtocolDecodeResult:
"""Result of protocol decoding"""
protocol_info: ProtocolInfo
fields: List[DecodedField] = field(default_factory=list)
frame_type: Optional[str] = None
payload_size: int = 0
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
Key Processing Components
EthernetAnalyzer
Top-level orchestrator with background processing capabilities:
class EthernetAnalyzer:
def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0):
self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime)
self.flow_manager = FlowManager(self.statistics_engine)
self.all_packets: List[Packet] = []
self.flows = self.flow_manager.flows # Exposed for UI access
self.background_analyzer = None # For large PCAP background processing
BackgroundAnalyzer
Thread pool based PCAP processing for large files:
class BackgroundAnalyzer:
def __init__(self, analyzer: EthernetAnalyzer,
num_threads: int = 4,
batch_size: int = 1000,
progress_callback: Optional[Callable[[ParsingProgress], None]] = None,
flow_update_callback: Optional[Callable[[], None]] = None):
self.analyzer = analyzer
self.executor = ThreadPoolExecutor(max_workers=num_threads)
self.batch_size = batch_size
self.progress_callback = progress_callback
self.flow_update_callback = flow_update_callback
self.is_running = False
self._shutdown_event = threading.Event()
FlowManager
Packet processing and flow creation with enhanced protocol support:
class FlowManager:
def __init__(self, statistics_engine=None):
self.flows: Dict[Tuple[str, str], FlowStats] = {}
self.specialized_dissectors = {
'chapter10': Chapter10Dissector(),
'ptp': PTPDissector(),
'iena': IENADissector()
}
self.enhanced_ch10_decoder = EnhancedChapter10Decoder()
self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin()
self.protocol_registry = ProtocolRegistry() # New protocol management
ProtocolRegistry
Centralized protocol information management:
class ProtocolRegistry:
def __init__(self):
self._protocols: Dict[ProtocolType, ProtocolInfo] = {}
self._register_standard_protocols() # UDP, TCP, ICMP, IGMP
self._register_enhanced_protocols() # CH10, PTP, IENA, NTP
def get_enhanced_protocols(self) -> List[ProtocolInfo]
def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool
def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]
Enhanced Protocol Definitions
Chapter 10 (CH10)
IRIG 106 Chapter 10 data recording format:
- Frame types:
CH10-Data,CH10-ACTTS,CH10-GPS,CH10-ACMI,CH10-Timing - Enhanced timing analysis with clock drift detection
- TMATS metadata parsing
Precision Time Protocol (PTP)
IEEE 1588 precision clock synchronization:
- Frame types:
PTP-Sync,PTP-Delay_Req,PTP-Follow_Up - Clock synchronization quality analysis
IENA
iNET-compatible Ethernet protocol:
- Frame types:
IENA-Control,IENA-Data - Packet structure validation
TUI Display Rules
Main Flow Row Shows:
- Source/Destination IP:Port
- Transport protocol (UDP/TCP)
- Total packets and volume
- Full timing statistics (ΔT, σ, outliers) if no enhanced sub-flows exist
- Basic timeline only (duration, first/last seen) if enhanced sub-flows exist
Sub-Rows Show (Enhanced Protocols Only):
- Frame type name (e.g.,
CH10-Data) - Packet count and percentage of flow
- Individual timing statistics (ΔT, σ, outliers)
- Protocol-specific enhanced analysis
Standard Protocols (UDP/TCP):
- Aggregated into main flow statistics
- No sub-rows created
- Timing calculated across all standard packets in flow
Processing Efficiency
- Streaming: Packets processed one-by-one (memory efficient)
- Bucketing: O(1) flow lookup via hash map
- Hierarchical: Statistics calculated at multiple levels simultaneously
- Real-time: Live updates during capture without full recalculation
- Selective display: Only enhanced protocols get detailed sub-flow analysis
- Background processing: Large PCAP files processed in thread pools with progressive UI updates
- Thread-safe updates: Concurrent data access protected with locks and atomic operations
- Modular data structures: Separated timing, quality, and decoded data for focused analysis
Recent Architecture Enhancements
Background Processing System
- ThreadPoolExecutor: Multi-threaded PCAP processing for large files
- Progress callbacks: Real-time parsing progress updates to TUI
- Flow update callbacks: Progressive flow data updates every 50 packets
- Thread-safe shutdown: Proper cleanup and signal handling
Modular Data Models
- TimingAnalysis: Dedicated timing metrics with quality classifications
- QualityMetrics: Comprehensive error tracking and confidence scoring
- DecodedData: Structured protocol field and channel information
- ProtocolRegistry: Centralized protocol information management
- Legacy compatibility: Maintains existing API while enabling new features
Enhanced TUI Features
- Progressive loading: TUI appears immediately while PCAP loads in background
- Real-time updates: Flow table refreshes as new packets are processed
- Split panel layout: Separate main flow and sub-flow detail views
- Enhanced-only sub-rows: Standard protocols aggregate into main flow
- Timing column visibility: Context-aware display based on sub-flow presence
This architecture enables real-time analysis of multi-protocol network traffic with enhanced timing analysis for specialized protocols while maintaining efficient memory usage, responsive UI updates, and scalable background processing for large PCAP files.
Flow: 192.168.4.89:49154 → 239.1.2.10:8400 Protocol: UDP Total Packets: 1452 Total Frame-Type Outliers: 20
=== Frame Type Outlier Analysis ===
CH10-Data: 20 outliers Frames: 1001, 1101, 1152, 1201, 1251, 1302, 1352, 1403, 1452, 1501, 1552, 1601, 1651, 1701, 1751, 1801, 1853, 1904, 1952, 2002 Avg ΔT: 49.626 ms Std σ: 10848.285 ms 3σ Threshold: 32594.481 ms Outlier Details: Frame 1001: 51498.493 ms (4.7σ) Frame 1101: 54598.390 ms (5.0σ) Frame 1152: 54598.393 ms (5.0σ) Frame 1201: 54498.392 ms (5.0σ) Frame 2002: 53398.517 ms (4.9σ) ... and 15 more