Files
StreamLens/data_summary.md

17 KiB
Raw Permalink Blame History

StreamLens Data Flow Architecture

Overview

StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA.

Complete Data Flow

1. Packet Input Layer

# analyzer/analysis/core.py:36-43
def analyze_pcap(self, pcap_file: str):
    packets = rdpcap(pcap_file)  # Scapy reads PCAP
    self._process_packets(packets)  # Process each packet

Sources:

  • PCAP/PCAPNG files
  • Live network interfaces via Scapy

2. Packet Processing Engine

# analyzer/analysis/core.py:65-67
def _process_packets(self, packets):
    for i, packet in enumerate(packets):
        self.dissector.dissect_packet(packet, i + 1)

Per-packet operations:

  • Frame number assignment
  • Timestamp extraction
  • Initial protocol detection

3. Flow Management & Classification

# analyzer/analysis/flow_manager.py:38-58
def process_packet(self, packet, frame_num):
    # Extract network identifiers
    src_ip, dst_ip = ip_layer.src, ip_layer.dst
    transport_info = self._extract_transport_info(packet)  # ports, protocol
    
    # Create unique flow key
    flow_key = (src_ip, dst_ip)
    
    # Initialize new flow or use existing
    if flow_key not in self.flows:
        self.flows[flow_key] = FlowStats(...)

Flow Bucketing:

  • Packets grouped by (src_ip, dst_ip) pairs
  • Each flow tracked in a FlowStats object
  • O(1) flow lookup via hash map

4. Protocol Detection Pipeline

# analyzer/analysis/flow_manager.py:89-96
# Basic protocols (UDP, TCP, ICMP)
protocols = self._detect_basic_protocols(packet)

# Enhanced protocol detection
dissection_results = self._dissect_packet(packet, frame_num)
enhanced_protocols = self._extract_enhanced_protocols(dissection_results)
flow.detected_protocol_types.update(enhanced_protocols)

# Fallback detection
fallback_protocols = self._detect_fallback_protocols(packet, dissection_results)

Protocol Hierarchy:

  1. Basic: Transport layer (UDP/TCP/ICMP)
  2. Enhanced: Application layer (CH10/PTP/IENA) via specialized dissectors
  3. Fallback: Port-based heuristics for unidentified protocols

5. Frame Type Classification

# analyzer/analysis/flow_manager.py:98-100
frame_type = self._classify_frame_type(packet, dissection_results)
self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size)

Sub-Flow Binning:

  • Each packet classified into specific frame types
  • Enhanced protocols: CH10-Data, CH10-ACTTS, PTP-Sync, IENA-Control
  • Standard protocols: UDP-Data, TCP-Stream

6. Statistical Analysis Engine

# analyzer/analysis/flow_manager.py:105-112
if len(flow.timestamps) > 1:
    inter_arrival = timestamp - flow.timestamps[-2]
    flow.inter_arrival_times.append(inter_arrival)
    
    # Real-time statistics if enabled
    if self.statistics_engine and self.statistics_engine.enable_realtime:
        self.statistics_engine.update_realtime_statistics(flow_key, flow)

Timing Calculations:

  • Flow-level: Aggregate timing across all packets
  • Frame-type level: Per-subtype timing in FrameTypeStats
  • Outlier detection: Based on configurable sigma thresholds
  • Real-time updates: Live statistical calculations during capture

7. Enhanced Protocol Analysis

# analyzer/analysis/flow_manager.py:102-103
self._perform_enhanced_analysis(packet, flow, frame_num, transport_info)

Specialized Decoders:

  • Chapter 10: Timing analysis, sequence validation, TMATS parsing
  • PTP: Clock synchronization analysis, message type classification
  • IENA: Packet structure validation, timing quality assessment

8. TUI Presentation Layer

# analyzer/tui/textual/widgets/flow_table_v2.py:143-153
if self._should_show_subrows(flow):
    enhanced_frame_types = self._get_enhanced_frame_types(flow)
    combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types)
    
    for extended_proto, frame_type, count, percentage in combinations:
        sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}")
        self.row_to_subflow_map[sub_key] = (i, frame_type)

Presentation Logic:

  • Main Rows: Flow-level aggregates (all packets in flow)
  • Sub-Rows: Only enhanced protocols (CH10-*, PTP-*, IENA-*)
  • Standard protocols: Aggregated into main flow timing
  • Timing display: ΔT (avg inter-arrival), σ (std deviation), outlier count

Core Data Types

FlowStats

Main container for flow-level statistics:

@dataclass
class FlowStats:
    # Network identifiers
    src_ip: str
    dst_ip: str
    src_port: int = 0  # 0 if not applicable
    dst_port: int = 0
    transport_protocol: str = "Unknown"  # TCP, UDP, ICMP, IGMP, etc.
    traffic_classification: str = "Unknown"  # Unicast, Multicast, Broadcast
    
    # Aggregate statistics
    frame_count: int = 0
    total_bytes: int = 0
    first_seen: float = 0.0  # Timestamp of first frame
    last_seen: float = 0.0   # Timestamp of last frame
    duration: float = 0.0    # Flow duration in seconds
    
    # Timing data
    timestamps: List[float] = field(default_factory=list)
    frame_numbers: List[int] = field(default_factory=list)
    inter_arrival_times: List[float] = field(default_factory=list)
    avg_inter_arrival: float = 0.0
    std_inter_arrival: float = 0.0
    jitter: float = 0.0
    
    # Outlier tracking
    outlier_frames: List[int] = field(default_factory=list)
    outlier_details: List[Tuple[int, float]] = field(default_factory=list)  # (frame_number, time_delta)
    
    # Protocol classification
    protocols: Set[str] = field(default_factory=set)  # Basic protocols seen
    detected_protocol_types: Set[str] = field(default_factory=set)  # Enhanced protocols (CH10, PTP, IENA)
    
    # Sub-flow bins
    frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict)
    
    # Enhanced analysis results
    enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData)

FrameTypeStats

Statistics for specific frame types within a flow:

@dataclass
class FrameTypeStats:
    # Frame type identifier
    frame_type: str  # e.g., "CH10-Data", "PTP-Sync", "UDP-Data"
    
    # Basic statistics
    count: int = 0
    total_bytes: int = 0
    
    # Timing data
    timestamps: List[float] = field(default_factory=list)
    frame_numbers: List[int] = field(default_factory=list)
    inter_arrival_times: List[float] = field(default_factory=list)
    avg_inter_arrival: float = 0.0
    std_inter_arrival: float = 0.0
    
    # Outlier tracking
    outlier_frames: List[int] = field(default_factory=list)
    outlier_details: List[Tuple[int, float]] = field(default_factory=list)

Enhanced Analysis Data Models

New Modular Structure

The enhanced analysis data has been restructured into focused components for better organization and extensibility:

@dataclass
class TimingAnalysis:
    """Timing analysis results for enhanced protocols"""
    avg_clock_drift_ppm: float = 0.0
    max_clock_drift_ppm: float = 0.0
    min_clock_drift_ppm: float = 0.0
    drift_variance: float = 0.0
    
    quality: TimingQuality = TimingQuality.UNKNOWN  # EXCELLENT, GOOD, MODERATE, POOR
    stability: TimingStability = TimingStability.UNKNOWN  # STABLE, VARIABLE, UNSTABLE
    
    timing_accuracy_percent: float = 0.0
    sync_errors: int = 0
    timing_anomalies: int = 0
    anomaly_rate_percent: float = 0.0
    
    has_internal_timing: bool = False
    rtc_sync_available: bool = False

@dataclass 
class QualityMetrics:
    """Quality metrics for enhanced protocol data"""
    avg_frame_quality_percent: float = 0.0
    frame_quality_samples: List[float] = field(default_factory=list)
    
    avg_signal_quality_percent: float = 0.0
    signal_quality_samples: List[float] = field(default_factory=list)
    
    sequence_gaps: int = 0
    format_errors: int = 0
    overflow_errors: int = 0
    checksum_errors: int = 0
    
    avg_confidence_score: float = 0.0
    confidence_samples: List[float] = field(default_factory=list)
    low_confidence_frames: int = 0
    
    corrupted_frames: int = 0
    missing_frames: int = 0
    duplicate_frames: int = 0

@dataclass
class DecodedData:
    """Container for decoded protocol data"""
    channel_count: int = 0
    analog_channels: int = 0
    pcm_channels: int = 0
    discrete_channels: int = 0
    
    primary_data_type: DataType = DataType.UNKNOWN  # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS
    secondary_data_types: Set[DataType] = field(default_factory=set)
    
    sample_decoded_fields: Dict[str, Any] = field(default_factory=dict)
    available_field_names: List[str] = field(default_factory=list)
    field_count: int = 0
    critical_fields: List[str] = field(default_factory=list)
    
    frame_types: Set[str] = field(default_factory=set)
    frame_type_distribution: Dict[str, int] = field(default_factory=dict)
    
    tmats_frames: int = 0
    setup_frames: int = 0
    data_frames: int = 0
    
    decoder_type: str = "Standard"
    decoder_version: Optional[str] = None
    decode_success_rate: float = 1.0

@dataclass
class EnhancedAnalysisData:
    """Complete enhanced analysis data combining all analysis types"""
    timing: TimingAnalysis = field(default_factory=TimingAnalysis)
    quality: QualityMetrics = field(default_factory=QualityMetrics)
    decoded: DecodedData = field(default_factory=DecodedData)
    
    # Legacy compatibility fields maintained for backward compatibility
    # (automatically synchronized with component data)

Protocol Information Models

@dataclass
class DecodedField:
    """Represents a single decoded field from a protocol"""
    name: str
    value: Any
    field_type: FieldType  # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc.
    description: Optional[str] = None
    unit: Optional[str] = None  # e.g., "ms", "bytes", "ppm"
    confidence: float = 1.0  # 0.0 to 1.0
    is_critical: bool = False

@dataclass
class ProtocolInfo:
    """Information about a detected protocol"""
    protocol_type: ProtocolType
    name: str
    category: ProtocolCategory  # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY
    version: Optional[str] = None
    confidence: float = 1.0
    
    port: Optional[int] = None
    subtype: Optional[str] = None  # e.g., "CH10-Data", "PTP-Sync"
    vendor: Optional[str] = None

@dataclass
class ProtocolDecodeResult:
    """Result of protocol decoding"""
    protocol_info: ProtocolInfo
    fields: List[DecodedField] = field(default_factory=list)
    frame_type: Optional[str] = None
    payload_size: int = 0
    errors: List[str] = field(default_factory=list)
    warnings: List[str] = field(default_factory=list)

Key Processing Components

EthernetAnalyzer

Top-level orchestrator with background processing capabilities:

class EthernetAnalyzer:
    def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0):
        self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime)
        self.flow_manager = FlowManager(self.statistics_engine)
        self.all_packets: List[Packet] = []
        self.flows = self.flow_manager.flows  # Exposed for UI access
        self.background_analyzer = None  # For large PCAP background processing

BackgroundAnalyzer

Thread pool based PCAP processing for large files:

class BackgroundAnalyzer:
    def __init__(self, analyzer: EthernetAnalyzer, 
                 num_threads: int = 4,
                 batch_size: int = 1000,
                 progress_callback: Optional[Callable[[ParsingProgress], None]] = None,
                 flow_update_callback: Optional[Callable[[], None]] = None):
        self.analyzer = analyzer
        self.executor = ThreadPoolExecutor(max_workers=num_threads)
        self.batch_size = batch_size
        self.progress_callback = progress_callback
        self.flow_update_callback = flow_update_callback
        self.is_running = False
        self._shutdown_event = threading.Event()

FlowManager

Packet processing and flow creation with enhanced protocol support:

class FlowManager:
    def __init__(self, statistics_engine=None):
        self.flows: Dict[Tuple[str, str], FlowStats] = {}
        self.specialized_dissectors = {
            'chapter10': Chapter10Dissector(),
            'ptp': PTPDissector(),
            'iena': IENADissector()
        }
        self.enhanced_ch10_decoder = EnhancedChapter10Decoder()
        self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin()
        self.protocol_registry = ProtocolRegistry()  # New protocol management

ProtocolRegistry

Centralized protocol information management:

class ProtocolRegistry:
    def __init__(self):
        self._protocols: Dict[ProtocolType, ProtocolInfo] = {}
        self._register_standard_protocols()  # UDP, TCP, ICMP, IGMP
        self._register_enhanced_protocols()  # CH10, PTP, IENA, NTP
    
    def get_enhanced_protocols(self) -> List[ProtocolInfo]
    def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool
    def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]

Enhanced Protocol Definitions

Chapter 10 (CH10)

IRIG 106 Chapter 10 data recording format:

  • Frame types: CH10-Data, CH10-ACTTS, CH10-GPS, CH10-ACMI, CH10-Timing
  • Enhanced timing analysis with clock drift detection
  • TMATS metadata parsing

Precision Time Protocol (PTP)

IEEE 1588 precision clock synchronization:

  • Frame types: PTP-Sync, PTP-Delay_Req, PTP-Follow_Up
  • Clock synchronization quality analysis

IENA

iNET-compatible Ethernet protocol:

  • Frame types: IENA-Control, IENA-Data
  • Packet structure validation

TUI Display Rules

Main Flow Row Shows:

  • Source/Destination IP:Port
  • Transport protocol (UDP/TCP)
  • Total packets and volume
  • Full timing statistics (ΔT, σ, outliers) if no enhanced sub-flows exist
  • Basic timeline only (duration, first/last seen) if enhanced sub-flows exist

Sub-Rows Show (Enhanced Protocols Only):

  • Frame type name (e.g., CH10-Data)
  • Packet count and percentage of flow
  • Individual timing statistics (ΔT, σ, outliers)
  • Protocol-specific enhanced analysis

Standard Protocols (UDP/TCP):

  • Aggregated into main flow statistics
  • No sub-rows created
  • Timing calculated across all standard packets in flow

Processing Efficiency

  • Streaming: Packets processed one-by-one (memory efficient)
  • Bucketing: O(1) flow lookup via hash map
  • Hierarchical: Statistics calculated at multiple levels simultaneously
  • Real-time: Live updates during capture without full recalculation
  • Selective display: Only enhanced protocols get detailed sub-flow analysis
  • Background processing: Large PCAP files processed in thread pools with progressive UI updates
  • Thread-safe updates: Concurrent data access protected with locks and atomic operations
  • Modular data structures: Separated timing, quality, and decoded data for focused analysis

Recent Architecture Enhancements

Background Processing System

  • ThreadPoolExecutor: Multi-threaded PCAP processing for large files
  • Progress callbacks: Real-time parsing progress updates to TUI
  • Flow update callbacks: Progressive flow data updates every 50 packets
  • Thread-safe shutdown: Proper cleanup and signal handling

Modular Data Models

  • TimingAnalysis: Dedicated timing metrics with quality classifications
  • QualityMetrics: Comprehensive error tracking and confidence scoring
  • DecodedData: Structured protocol field and channel information
  • ProtocolRegistry: Centralized protocol information management
  • Legacy compatibility: Maintains existing API while enabling new features

Enhanced TUI Features

  • Progressive loading: TUI appears immediately while PCAP loads in background
  • Real-time updates: Flow table refreshes as new packets are processed
  • Split panel layout: Separate main flow and sub-flow detail views
  • Enhanced-only sub-rows: Standard protocols aggregate into main flow
  • Timing column visibility: Context-aware display based on sub-flow presence

This architecture enables real-time analysis of multi-protocol network traffic with enhanced timing analysis for specialized protocols while maintaining efficient memory usage, responsive UI updates, and scalable background processing for large PCAP files.

Flow: 192.168.4.89:49154 → 239.1.2.10:8400 Protocol: UDP Total Packets: 1452 Total Frame-Type Outliers: 20

=== Frame Type Outlier Analysis ===

CH10-Data: 20 outliers Frames: 1001, 1101, 1152, 1201, 1251, 1302, 1352, 1403, 1452, 1501, 1552, 1601, 1651, 1701, 1751, 1801, 1853, 1904, 1952, 2002 Avg ΔT: 49.626 ms Std σ: 10848.285 ms 3σ Threshold: 32594.481 ms Outlier Details: Frame 1001: 51498.493 ms (4.7σ) Frame 1101: 54598.390 ms (5.0σ) Frame 1152: 54598.393 ms (5.0σ) Frame 1201: 54498.392 ms (5.0σ) Frame 2002: 53398.517 ms (4.9σ) ... and 15 more