diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..d737163 --- /dev/null +++ b/.envrc @@ -0,0 +1,3 @@ +#!/bin/bash +# Automatically activate Python virtual environment +source venv/bin/activate \ No newline at end of file diff --git a/.gitignore b/.gitignore index 09f70d9..864072a 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,7 @@ wheels/ *.egg-info/ .installed.cfg *.egg + +# Environment files +.env +.envrc.local diff --git a/.swarm/memory.db b/.swarm/memory.db new file mode 100644 index 0000000..194e0fb Binary files /dev/null and b/.swarm/memory.db differ diff --git a/analyzer/analysis/background_analyzer.py b/analyzer/analysis/background_analyzer.py new file mode 100644 index 0000000..6d82f12 --- /dev/null +++ b/analyzer/analysis/background_analyzer.py @@ -0,0 +1,364 @@ +""" +Background PCAP analyzer with thread pool support for progressive loading +""" + +import threading +import queue +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Callable, Optional, List +import time +from dataclasses import dataclass +import logging + +try: + from scapy.all import rdpcap, PcapReader, Packet +except ImportError: + print("Error: scapy library required. Install with: pip install scapy") + import sys + sys.exit(1) + +from .core import EthernetAnalyzer + + +@dataclass +class ParsingProgress: + """Progress information for PCAP parsing""" + total_packets: int + processed_packets: int + percent_complete: float + packets_per_second: float + elapsed_time: float + estimated_time_remaining: float + is_complete: bool = False + error: Optional[str] = None + + +class BackgroundAnalyzer: + """Analyzer that processes PCAP files in background threads""" + + def __init__(self, analyzer: EthernetAnalyzer, + num_threads: int = 4, + batch_size: int = 1000, + progress_callback: Optional[Callable[[ParsingProgress], None]] = None, + flow_update_callback: Optional[Callable[[], None]] = None): + """ + Initialize background analyzer + + Args: + analyzer: Core analyzer instance + num_threads: Number of worker threads + batch_size: Packets to process per batch + progress_callback: Callback for progress updates + flow_update_callback: Callback for flow data updates + """ + self.analyzer = analyzer + self.num_threads = num_threads + self.batch_size = batch_size + self.progress_callback = progress_callback + self.flow_update_callback = flow_update_callback + + # Threading components + self.executor = ThreadPoolExecutor(max_workers=num_threads) + self.packet_queue = queue.Queue(maxsize=num_threads * 2) + self.stop_event = threading.Event() + self.parse_lock = threading.Lock() + + # Progress tracking + self.total_packets = 0 + self.processed_packets = 0 + self.start_time = None + self.is_parsing = False + + # Flow update synchronization + self.flow_lock = threading.RLock() + + # Flow update batching + self.packets_since_update = 0 + self.update_batch_size = 50 # Update UI every 50 packets (more frequent) + self.update_lock = threading.Lock() + + logging.basicConfig(level=logging.INFO) + self.logger = logging.getLogger(__name__) + + def start_parsing(self, pcap_file: str) -> None: + """Start parsing PCAP file in background""" + if self.is_parsing: + self.logger.warning("Already parsing a file") + return + + self.is_parsing = True + self.stop_event.clear() + self.start_time = time.time() + self.processed_packets = 0 + + # Start reader thread + reader_thread = threading.Thread( + target=self._read_pcap_file, + args=(pcap_file,), + daemon=True + ) + reader_thread.start() + self.reader_thread = reader_thread + + # Start worker threads + futures = [] + for _ in range(self.num_threads): + future = self.executor.submit(self._process_packet_batches) + futures.append(future) + + # Monitor progress in separate thread + monitor_thread = threading.Thread( + target=self._monitor_progress, + args=(futures,), + daemon=True + ) + monitor_thread.start() + self.monitor_thread = monitor_thread + + def _read_pcap_file(self, pcap_file: str) -> None: + """Read PCAP file and queue packets for processing""" + try: + self.logger.info(f"Starting to read {pcap_file}") + + # First, get total packet count for progress tracking + with PcapReader(pcap_file) as reader: + # Quick pass to count packets + count = 0 + for _ in reader: + count += 1 + self.total_packets = count + + self.logger.info(f"Found {self.total_packets} packets to process") + + # Now read and queue packets + with PcapReader(pcap_file) as reader: + batch = [] + batch_num = 0 + + for i, packet in enumerate(reader): + if self.stop_event.is_set(): + break + + batch.append((i + 1, packet)) + + if len(batch) >= self.batch_size: + self.packet_queue.put(batch) + batch = [] + batch_num += 1 + + # Queue remaining packets + if batch: + self.packet_queue.put(batch) + + except Exception as e: + self.logger.error(f"Error reading PCAP: {e}") + self._report_progress(error=str(e)) + finally: + # Signal end of packets + for _ in range(self.num_threads): + self.packet_queue.put(None) + + def _process_packet_batches(self) -> None: + """Worker thread to process packet batches""" + while not self.stop_event.is_set(): + try: + batch = self.packet_queue.get(timeout=0.5) # Shorter timeout for faster exit + if batch is None: # End signal + break + + # Process batch of packets + for frame_num, packet in batch: + if self.stop_event.is_set(): + break + + try: + # Thread-safe packet processing + with self.flow_lock: + self.analyzer.flow_manager.process_packet(packet, frame_num) + + # Update progress + with self.parse_lock: + self.processed_packets += 1 + + # Check if we should trigger a flow update + should_update = False + with self.update_lock: + self.packets_since_update += 1 + if self.packets_since_update >= self.update_batch_size: + self.packets_since_update = 0 + should_update = True + + # Trigger flow update callback if needed + if should_update and self.flow_update_callback: + try: + self.flow_update_callback() + except Exception as e: + self.logger.error(f"Error in flow update callback: {e}") + + except Exception as e: + self.logger.error(f"Error processing packet {frame_num}: {e}") + continue + + except queue.Empty: + # Check stop event more frequently + if self.stop_event.is_set(): + break + continue + except KeyboardInterrupt: + self.logger.info("Packet processing interrupted") + break + except Exception as e: + self.logger.error(f"Error processing batch: {e}") + if self.stop_event.is_set(): + break + + def _monitor_progress(self, futures: List) -> None: + """Monitor parsing progress and send updates""" + last_update_time = time.time() + last_packet_count = 0 + + while self.is_parsing and not self.stop_event.is_set(): + try: + current_time = time.time() + + # Update every 0.5 seconds + if current_time - last_update_time >= 0.5: + with self.parse_lock: + current_packets = self.processed_packets + + # Calculate metrics + elapsed = current_time - self.start_time + packets_processed = current_packets - last_packet_count + time_delta = current_time - last_update_time + packets_per_second = packets_processed / time_delta if time_delta > 0 else 0 + + # Update for next iteration + last_update_time = current_time + last_packet_count = current_packets + + # Report progress + self._report_progress( + packets_per_second=packets_per_second, + elapsed_time=elapsed + ) + + # Check if all workers are done + if all(f.done() for f in futures): + break + + time.sleep(0.1) + except KeyboardInterrupt: + self.logger.info("Monitor thread interrupted") + break + except Exception as e: + self.logger.error(f"Error in monitor thread: {e}") + break + + # Final update + self.is_parsing = False + self._report_progress(is_complete=True) + + # Final flow update + if self.flow_update_callback: + try: + self.flow_update_callback() + except Exception as e: + self.logger.error(f"Error in final flow update callback: {e}") + + # Calculate final statistics + with self.flow_lock: + self.analyzer.statistics_engine.calculate_all_statistics() + + def _report_progress(self, packets_per_second: float = 0, + elapsed_time: float = 0, + is_complete: bool = False, + error: Optional[str] = None) -> None: + """Report parsing progress""" + with self.parse_lock: + processed = self.processed_packets + total = self.total_packets + + if total > 0: + percent = (processed / total) * 100 + + # Estimate time remaining + if packets_per_second > 0 and processed < total: + remaining_packets = total - processed + eta = remaining_packets / packets_per_second + else: + eta = 0 + else: + percent = 0 + eta = 0 + + progress = ParsingProgress( + total_packets=total, + processed_packets=processed, + percent_complete=percent, + packets_per_second=packets_per_second, + elapsed_time=elapsed_time, + estimated_time_remaining=eta, + is_complete=is_complete, + error=error + ) + + if self.progress_callback: + self.progress_callback(progress) + + def stop_parsing(self) -> None: + """Stop background parsing""" + self.logger.info("Stopping background parsing") + self.stop_event.set() + self.is_parsing = False + + def get_current_flows(self): + """Get current flows (thread-safe)""" + with self.flow_lock: + return dict(self.analyzer.flows) + + def get_summary(self): + """Get current summary statistics (thread-safe)""" + with self.flow_lock: + return self.analyzer.get_summary() + + def cleanup(self): + """Cleanup resources""" + self.logger.info("Starting cleanup...") + self.stop_parsing() + + try: + # Clear the queue to unblock waiting workers + while not self.packet_queue.empty(): + try: + self.packet_queue.get_nowait() + except queue.Empty: + break + + # Send stop signals to all workers + for _ in range(self.num_threads): + try: + self.packet_queue.put(None, timeout=0.1) + except queue.Full: + pass + + # Wait briefly for threads to see stop signal + time.sleep(0.1) + + # Force shutdown with no wait - this kills threads immediately + try: + self.executor.shutdown(wait=False) + except Exception: + pass + + # Join daemon threads if they exist + if hasattr(self, 'reader_thread') and self.reader_thread.is_alive(): + # Can't join daemon threads, they will be killed when main thread exits + pass + + if hasattr(self, 'monitor_thread') and self.monitor_thread.is_alive(): + # Can't join daemon threads, they will be killed when main thread exits + pass + + self.logger.info("Cleanup complete") + except Exception as e: + self.logger.error(f"Error during cleanup: {e}") \ No newline at end of file diff --git a/analyzer/analysis/flow_manager.py b/analyzer/analysis/flow_manager.py index 08cba9e..65e7770 100644 --- a/analyzer/analysis/flow_manager.py +++ b/analyzer/analysis/flow_manager.py @@ -86,6 +86,17 @@ class FlowManager: flow.total_bytes += packet_size flow.protocols.update(protocols) + # Update timeline statistics + if flow.frame_count == 1: + # First packet in flow + flow.first_seen = timestamp + flow.last_seen = timestamp + flow.duration = 0.0 + else: + # Update last seen and duration + flow.last_seen = timestamp + flow.duration = flow.last_seen - flow.first_seen + # Enhanced protocol detection dissection_results = self._dissect_packet(packet, frame_num) enhanced_protocols = self._extract_enhanced_protocols(dissection_results) diff --git a/analyzer/analysis/statistics.py b/analyzer/analysis/statistics.py index f4e952f..9eee319 100644 --- a/analyzer/analysis/statistics.py +++ b/analyzer/analysis/statistics.py @@ -29,6 +29,12 @@ class StatisticsEngine: def _calculate_single_flow_statistics(self, flow: FlowStats) -> None: """Calculate statistics for a single flow""" + # Ensure timeline statistics are calculated + if len(flow.timestamps) >= 2: + flow.duration = flow.timestamps[-1] - flow.timestamps[0] + flow.first_seen = flow.timestamps[0] + flow.last_seen = flow.timestamps[-1] + if len(flow.inter_arrival_times) < 2: return @@ -36,9 +42,19 @@ class StatisticsEngine: flow.avg_inter_arrival = statistics.mean(flow.inter_arrival_times) flow.std_inter_arrival = statistics.stdev(flow.inter_arrival_times) + # Calculate jitter as coefficient of variation (normalized standard deviation) + if flow.avg_inter_arrival > 0: + flow.jitter = flow.std_inter_arrival / flow.avg_inter_arrival + else: + flow.jitter = 0.0 + # Detect outliers (frames with inter-arrival time > threshold * std deviations from mean) threshold = flow.avg_inter_arrival + (self.outlier_threshold_sigma * flow.std_inter_arrival) + # Clear existing outliers to recalculate + flow.outlier_frames.clear() + flow.outlier_details.clear() + for i, inter_time in enumerate(flow.inter_arrival_times): if inter_time > threshold: # Frame number is i+2 because inter_arrival_times[i] is between frame i+1 and i+2 diff --git a/analyzer/main.py b/analyzer/main.py index b3bb68d..08f2e8a 100644 --- a/analyzer/main.py +++ b/analyzer/main.py @@ -72,8 +72,8 @@ def main(): print(f"Last packet: {last_time}") return - # Load PCAP file - if args.pcap: + # Load PCAP file (skip for textual mode which uses background parsing) + if args.pcap and not args.textual: try: loader = PCAPLoader(args.pcap) if not loader.validate_file(): @@ -93,6 +93,16 @@ def main(): except Exception as e: print(f"Error loading PCAP file: {e}") sys.exit(1) + elif args.pcap and args.textual: + # For textual mode, just validate the file exists + try: + loader = PCAPLoader(args.pcap) + if not loader.validate_file(): + print(f"Error: Invalid or inaccessible PCAP file: {args.pcap}") + sys.exit(1) + except Exception as e: + print(f"Error validating PCAP file: {e}") + sys.exit(1) # Handle console output mode if args.no_tui: @@ -106,9 +116,24 @@ def main(): # TUI mode - choose between classic, modern curses, and textual interface if args.textual: - # Use new Textual-based interface (TipTop-inspired) + # Use new Textual-based interface (TipTop-inspired) with background parsing app = StreamLensAppV2(analyzer) - app.run() + + try: + # Start background parsing if PCAP file provided + if args.pcap: + app.start_background_parsing(args.pcap) + + app.run() + except KeyboardInterrupt: + print("\nShutting down...") + finally: + # Cleanup background threads + try: + app.cleanup() + except Exception as e: + print(f"Cleanup error: {e}") + pass return elif args.classic: tui = TUIInterface(analyzer) diff --git a/analyzer/models/__init__.py b/analyzer/models/__init__.py index 5e47c94..f58d6ec 100644 --- a/analyzer/models/__init__.py +++ b/analyzer/models/__init__.py @@ -1,8 +1,54 @@ """ -Data models for the Ethernet Traffic Analyzer +StreamLens Data Models + +This module provides the core data structures used throughout StreamLens for +representing network flows, protocol information, and decoded packet data. + +The models are organized into several categories: +- Core models: FlowStats, FrameTypeStats +- Protocol models: ProtocolInfo, DecodedField, ProtocolRegistry +- Analysis models: EnhancedAnalysisData, TimingAnalysis +- Result models: AnalysisResult, DissectionResult """ +# Core data models from .flow_stats import FlowStats, FrameTypeStats -from .analysis_results import AnalysisResult +from .analysis_results import AnalysisResult, DissectionResult -__all__ = ['FlowStats', 'FrameTypeStats', 'AnalysisResult'] \ No newline at end of file +# Protocol models (new) +from .protocols import ( + ProtocolInfo, + DecodedField, + ProtocolRegistry, + StandardProtocol, + EnhancedProtocol +) + +# Enhanced analysis models (refactored) +from .enhanced_analysis import ( + EnhancedAnalysisData, + TimingAnalysis, + QualityMetrics, + DecodedData +) + +__all__ = [ + # Core models + 'FlowStats', + 'FrameTypeStats', + 'AnalysisResult', + 'DissectionResult', + + # Protocol models + 'ProtocolInfo', + 'DecodedField', + 'ProtocolRegistry', + 'StandardProtocol', + 'EnhancedProtocol', + + # Enhanced analysis + 'EnhancedAnalysisData', + 'TimingAnalysis', + 'QualityMetrics', + 'DecodedData' +] \ No newline at end of file diff --git a/analyzer/models/enhanced_analysis.py b/analyzer/models/enhanced_analysis.py new file mode 100644 index 0000000..8f907dc --- /dev/null +++ b/analyzer/models/enhanced_analysis.py @@ -0,0 +1,289 @@ +""" +Enhanced Analysis Data Models + +This module defines data structures for enhanced protocol analysis including +timing analysis, quality metrics, and decoded data representation. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Set, Any, Optional +from enum import Enum + + +class TimingQuality(Enum): + """Timing quality classifications""" + EXCELLENT = "excellent" # < 1ppm drift, stable + GOOD = "good" # 1-10ppm drift, mostly stable + MODERATE = "moderate" # 10-100ppm drift, variable + POOR = "poor" # > 100ppm drift, unstable + UNKNOWN = "unknown" + + +class TimingStability(Enum): + """Timing stability classifications""" + STABLE = "stable" # Consistent timing behavior + VARIABLE = "variable" # Some timing variations + UNSTABLE = "unstable" # Highly variable timing + UNKNOWN = "unknown" + + +class DataType(Enum): + """Primary data types in enhanced protocols""" + ANALOG = "analog" + PCM = "pcm" + DISCRETE = "discrete" + TIME = "time" + VIDEO = "video" + TMATS = "tmats" + UNKNOWN = "unknown" + + +@dataclass +class TimingAnalysis: + """Timing analysis results for enhanced protocols""" + avg_clock_drift_ppm: float = 0.0 + max_clock_drift_ppm: float = 0.0 + min_clock_drift_ppm: float = 0.0 + drift_variance: float = 0.0 + + quality: TimingQuality = TimingQuality.UNKNOWN + stability: TimingStability = TimingStability.UNKNOWN + + # Timing accuracy metrics + timing_accuracy_percent: float = 0.0 + sync_errors: int = 0 + timing_anomalies: int = 0 + anomaly_rate_percent: float = 0.0 + + # Internal timing capabilities + has_internal_timing: bool = False + rtc_sync_available: bool = False + + def calculate_quality(self) -> TimingQuality: + """Calculate timing quality based on drift measurements""" + max_drift = abs(max(self.max_clock_drift_ppm, self.min_clock_drift_ppm, key=abs)) + + if max_drift < 1.0: + return TimingQuality.EXCELLENT + elif max_drift < 10.0: + return TimingQuality.GOOD + elif max_drift < 100.0: + return TimingQuality.MODERATE + else: + return TimingQuality.POOR + + def calculate_stability(self) -> TimingStability: + """Calculate timing stability based on variance""" + if self.drift_variance < 1.0: + return TimingStability.STABLE + elif self.drift_variance < 25.0: + return TimingStability.VARIABLE + else: + return TimingStability.UNSTABLE + + +@dataclass +class QualityMetrics: + """Quality metrics for enhanced protocol data""" + # Frame quality metrics + avg_frame_quality_percent: float = 0.0 + frame_quality_samples: List[float] = field(default_factory=list) + + # Signal quality metrics + avg_signal_quality_percent: float = 0.0 + signal_quality_samples: List[float] = field(default_factory=list) + + # Error counts + sequence_gaps: int = 0 + format_errors: int = 0 + overflow_errors: int = 0 + checksum_errors: int = 0 + + # Confidence metrics + avg_confidence_score: float = 0.0 + confidence_samples: List[float] = field(default_factory=list) + low_confidence_frames: int = 0 + + # Data integrity + corrupted_frames: int = 0 + missing_frames: int = 0 + duplicate_frames: int = 0 + + def calculate_overall_quality(self) -> float: + """Calculate overall quality score (0-100)""" + if not self.frame_quality_samples and not self.signal_quality_samples: + return 0.0 + + frame_score = self.avg_frame_quality_percent if self.frame_quality_samples else 100.0 + signal_score = self.avg_signal_quality_percent if self.signal_quality_samples else 100.0 + confidence_score = self.avg_confidence_score * 100 if self.confidence_samples else 100.0 + + # Weight the scores + weighted_score = (frame_score * 0.4 + signal_score * 0.4 + confidence_score * 0.2) + + # Apply error penalties + total_frames = len(self.frame_quality_samples) or 1 + error_rate = (self.format_errors + self.overflow_errors + self.corrupted_frames) / total_frames + penalty = min(error_rate * 50, 50) # Max 50% penalty + + return max(0.0, weighted_score - penalty) + + +@dataclass +class DecodedData: + """Container for decoded protocol data""" + # Channel information + channel_count: int = 0 + analog_channels: int = 0 + pcm_channels: int = 0 + discrete_channels: int = 0 + + # Data type classification + primary_data_type: DataType = DataType.UNKNOWN + secondary_data_types: Set[DataType] = field(default_factory=set) + + # Decoded field information + sample_decoded_fields: Dict[str, Any] = field(default_factory=dict) + available_field_names: List[str] = field(default_factory=list) + field_count: int = 0 + critical_fields: List[str] = field(default_factory=list) + + # Frame type analysis + frame_types: Set[str] = field(default_factory=set) + frame_type_distribution: Dict[str, int] = field(default_factory=dict) + + # Special frame counts + tmats_frames: int = 0 + setup_frames: int = 0 + data_frames: int = 0 + + # Decoder metadata + decoder_type: str = "Standard" + decoder_version: Optional[str] = None + decode_success_rate: float = 1.0 + + def add_frame_type(self, frame_type: str): + """Add a frame type to the analysis""" + self.frame_types.add(frame_type) + self.frame_type_distribution[frame_type] = self.frame_type_distribution.get(frame_type, 0) + 1 + + def get_dominant_frame_type(self) -> Optional[str]: + """Get the most common frame type""" + if not self.frame_type_distribution: + return None + return max(self.frame_type_distribution.items(), key=lambda x: x[1])[0] + + def update_data_type_classification(self): + """Update primary data type based on channel counts""" + if self.analog_channels > 0 and self.analog_channels >= self.pcm_channels: + self.primary_data_type = DataType.ANALOG + elif self.pcm_channels > 0: + self.primary_data_type = DataType.PCM + elif self.discrete_channels > 0: + self.primary_data_type = DataType.DISCRETE + elif self.tmats_frames > 0: + self.primary_data_type = DataType.TMATS + + # Add secondary types + if self.analog_channels > 0: + self.secondary_data_types.add(DataType.ANALOG) + if self.pcm_channels > 0: + self.secondary_data_types.add(DataType.PCM) + if self.discrete_channels > 0: + self.secondary_data_types.add(DataType.DISCRETE) + if self.tmats_frames > 0: + self.secondary_data_types.add(DataType.TMATS) + + +@dataclass +class EnhancedAnalysisData: + """Complete enhanced analysis data combining all analysis types""" + timing: TimingAnalysis = field(default_factory=TimingAnalysis) + quality: QualityMetrics = field(default_factory=QualityMetrics) + decoded: DecodedData = field(default_factory=DecodedData) + + # Legacy compatibility fields (will be deprecated) + avg_clock_drift_ppm: float = field(init=False) + max_clock_drift_ppm: float = field(init=False) + timing_quality: str = field(init=False) + timing_stability: str = field(init=False) + anomaly_rate: float = field(init=False) + avg_confidence_score: float = field(init=False) + avg_frame_quality: float = field(init=False) + sequence_gaps: int = field(init=False) + rtc_sync_errors: int = field(init=False) + format_errors: int = field(init=False) + overflow_errors: int = field(init=False) + channel_count: int = field(init=False) + analog_channels: int = field(init=False) + pcm_channels: int = field(init=False) + tmats_frames: int = field(init=False) + has_internal_timing: bool = field(init=False) + primary_data_type: str = field(init=False) + decoder_type: str = field(init=False) + sample_decoded_fields: Dict[str, Any] = field(init=False) + available_field_names: List[str] = field(init=False) + field_count: int = field(init=False) + frame_types: Set[str] = field(init=False) + timing_accuracy: float = field(init=False) + signal_quality: float = field(init=False) + + def __post_init__(self): + """Initialize legacy compatibility properties""" + self._update_legacy_fields() + + def _update_legacy_fields(self): + """Update legacy fields from new structured data""" + # Timing fields + self.avg_clock_drift_ppm = self.timing.avg_clock_drift_ppm + self.max_clock_drift_ppm = self.timing.max_clock_drift_ppm + self.timing_quality = self.timing.quality.value + self.timing_stability = self.timing.stability.value + self.anomaly_rate = self.timing.anomaly_rate_percent + self.has_internal_timing = self.timing.has_internal_timing + self.timing_accuracy = self.timing.timing_accuracy_percent + + # Quality fields + self.avg_confidence_score = self.quality.avg_confidence_score + self.avg_frame_quality = self.quality.avg_frame_quality_percent + self.sequence_gaps = self.quality.sequence_gaps + self.rtc_sync_errors = self.timing.sync_errors + self.format_errors = self.quality.format_errors + self.overflow_errors = self.quality.overflow_errors + self.signal_quality = self.quality.avg_signal_quality_percent + + # Decoded data fields + self.channel_count = self.decoded.channel_count + self.analog_channels = self.decoded.analog_channels + self.pcm_channels = self.decoded.pcm_channels + self.tmats_frames = self.decoded.tmats_frames + self.primary_data_type = self.decoded.primary_data_type.value + self.decoder_type = self.decoded.decoder_type + self.sample_decoded_fields = self.decoded.sample_decoded_fields + self.available_field_names = self.decoded.available_field_names + self.field_count = self.decoded.field_count + self.frame_types = self.decoded.frame_types + + def update_from_components(self): + """Update legacy fields when component objects change""" + self._update_legacy_fields() + + def get_overall_health_score(self) -> float: + """Calculate overall health score for the enhanced analysis""" + quality_score = self.quality.calculate_overall_quality() + + # Factor in timing quality + timing_score = 100.0 + if self.timing.quality == TimingQuality.EXCELLENT: + timing_score = 100.0 + elif self.timing.quality == TimingQuality.GOOD: + timing_score = 80.0 + elif self.timing.quality == TimingQuality.MODERATE: + timing_score = 60.0 + elif self.timing.quality == TimingQuality.POOR: + timing_score = 30.0 + else: + timing_score = 50.0 # Unknown + + # Weight quality more heavily than timing + return (quality_score * 0.7 + timing_score * 0.3) \ No newline at end of file diff --git a/analyzer/models/protocols.py b/analyzer/models/protocols.py new file mode 100644 index 0000000..3122faa --- /dev/null +++ b/analyzer/models/protocols.py @@ -0,0 +1,258 @@ +""" +Protocol Information Data Models + +This module defines data structures for representing protocol information, +decoded fields, and protocol registry management. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Set, Optional, Any, Union +from enum import Enum, IntEnum +from abc import ABC, abstractmethod + + +class ProtocolType(IntEnum): + """Protocol type identifiers""" + UNKNOWN = 0 + + # Standard protocols + UDP = 10 + TCP = 11 + ICMP = 12 + IGMP = 13 + + # Enhanced protocols + CHAPTER10 = 100 + CH10 = 100 # Alias for CHAPTER10 + PTP = 101 + IENA = 102 + NTP = 103 + + +class ProtocolCategory(Enum): + """Protocol categories for organization""" + TRANSPORT = "transport" # UDP, TCP, ICMP + NETWORK = "network" # IP, IGMP + ENHANCED = "enhanced" # CH10, PTP, IENA + TIMING = "timing" # PTP, NTP + TELEMETRY = "telemetry" # CH10, IENA + + +class FieldType(Enum): + """Types of decoded fields""" + INTEGER = "integer" + FLOAT = "float" + STRING = "string" + BOOLEAN = "boolean" + TIMESTAMP = "timestamp" + IP_ADDRESS = "ip_address" + MAC_ADDRESS = "mac_address" + BINARY = "binary" + ENUM = "enum" + + +@dataclass +class DecodedField: + """Represents a single decoded field from a protocol""" + name: str + value: Any + field_type: FieldType + description: Optional[str] = None + unit: Optional[str] = None # e.g., "ms", "bytes", "ppm" + confidence: float = 1.0 # 0.0 to 1.0 + is_critical: bool = False # Critical field for protocol operation + + def __str__(self) -> str: + unit_str = f" {self.unit}" if self.unit else "" + return f"{self.name}: {self.value}{unit_str}" + + def format_value(self) -> str: + """Format the value for display""" + if self.field_type == FieldType.TIMESTAMP: + import datetime + if isinstance(self.value, (int, float)): + dt = datetime.datetime.fromtimestamp(self.value) + return dt.strftime("%H:%M:%S.%f")[:-3] + elif self.field_type == FieldType.FLOAT: + return f"{self.value:.3f}" + elif self.field_type == FieldType.IP_ADDRESS: + return str(self.value) + elif self.field_type == FieldType.BINARY: + if isinstance(self.value, bytes): + return self.value.hex()[:16] + "..." if len(self.value) > 8 else self.value.hex() + + return str(self.value) + + +@dataclass +class ProtocolInfo: + """Information about a detected protocol""" + protocol_type: ProtocolType + name: str + category: ProtocolCategory + version: Optional[str] = None + confidence: float = 1.0 # Detection confidence 0.0 to 1.0 + + # Protocol-specific metadata + port: Optional[int] = None + subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" + vendor: Optional[str] = None + + def __str__(self) -> str: + version_str = f" v{self.version}" if self.version else "" + subtype_str = f"-{self.subtype}" if self.subtype else "" + return f"{self.name}{subtype_str}{version_str}" + + @property + def is_enhanced(self) -> bool: + """Check if this is an enhanced protocol requiring special handling""" + return self.category in [ProtocolCategory.ENHANCED, ProtocolCategory.TIMING, ProtocolCategory.TELEMETRY] + + +class StandardProtocol: + """Standard protocol definitions""" + + UDP = ProtocolInfo( + protocol_type=ProtocolType.UDP, + name="UDP", + category=ProtocolCategory.TRANSPORT + ) + + TCP = ProtocolInfo( + protocol_type=ProtocolType.TCP, + name="TCP", + category=ProtocolCategory.TRANSPORT + ) + + ICMP = ProtocolInfo( + protocol_type=ProtocolType.ICMP, + name="ICMP", + category=ProtocolCategory.NETWORK + ) + + IGMP = ProtocolInfo( + protocol_type=ProtocolType.IGMP, + name="IGMP", + category=ProtocolCategory.NETWORK + ) + + +class EnhancedProtocol: + """Enhanced protocol definitions""" + + CHAPTER10 = ProtocolInfo( + protocol_type=ProtocolType.CHAPTER10, + name="Chapter 10", + category=ProtocolCategory.TELEMETRY + ) + + PTP = ProtocolInfo( + protocol_type=ProtocolType.PTP, + name="PTP", + category=ProtocolCategory.TIMING + ) + + IENA = ProtocolInfo( + protocol_type=ProtocolType.IENA, + name="IENA", + category=ProtocolCategory.TELEMETRY + ) + + NTP = ProtocolInfo( + protocol_type=ProtocolType.NTP, + name="NTP", + category=ProtocolCategory.TIMING + ) + + +@dataclass +class ProtocolDecodeResult: + """Result of protocol decoding""" + protocol_info: ProtocolInfo + fields: List[DecodedField] = field(default_factory=list) + frame_type: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" + payload_size: int = 0 + errors: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + + def get_field(self, name: str) -> Optional[DecodedField]: + """Get a specific field by name""" + for field in self.fields: + if field.name == name: + return field + return None + + def get_critical_fields(self) -> List[DecodedField]: + """Get all critical fields""" + return [f for f in self.fields if f.is_critical] + + def has_errors(self) -> bool: + """Check if decode result has any errors""" + return len(self.errors) > 0 + + +class ProtocolRegistry: + """Registry for managing protocol information and detection""" + + def __init__(self): + self._protocols: Dict[ProtocolType, ProtocolInfo] = {} + self._register_standard_protocols() + self._register_enhanced_protocols() + + def _register_standard_protocols(self): + """Register standard protocols""" + for attr_name in dir(StandardProtocol): + if not attr_name.startswith('_'): + protocol = getattr(StandardProtocol, attr_name) + if isinstance(protocol, ProtocolInfo): + self._protocols[protocol.protocol_type] = protocol + + def _register_enhanced_protocols(self): + """Register enhanced protocols""" + for attr_name in dir(EnhancedProtocol): + if not attr_name.startswith('_'): + protocol = getattr(EnhancedProtocol, attr_name) + if isinstance(protocol, ProtocolInfo): + self._protocols[protocol.protocol_type] = protocol + + def get_protocol(self, protocol_type: ProtocolType) -> Optional[ProtocolInfo]: + """Get protocol info by type""" + return self._protocols.get(protocol_type) + + def get_protocol_by_name(self, name: str) -> Optional[ProtocolInfo]: + """Get protocol info by name""" + for protocol in self._protocols.values(): + if protocol.name.lower() == name.lower(): + return protocol + return None + + def get_enhanced_protocols(self) -> List[ProtocolInfo]: + """Get all enhanced protocols""" + return [p for p in self._protocols.values() if p.is_enhanced] + + def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo]: + """Get all protocols in a category""" + return [p for p in self._protocols.values() if p.category == category] + + def register_protocol(self, protocol_info: ProtocolInfo): + """Register a new protocol""" + self._protocols[protocol_info.protocol_type] = protocol_info + + def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool: + """Check if protocol type is enhanced""" + protocol = self.get_protocol(protocol_type) + return protocol.is_enhanced if protocol else False + + +# Global protocol registry instance +PROTOCOL_REGISTRY = ProtocolRegistry() + + +def get_protocol_info(protocol_type: ProtocolType) -> Optional[ProtocolInfo]: + """Convenience function to get protocol info""" + return PROTOCOL_REGISTRY.get_protocol(protocol_type) + + +def is_enhanced_protocol(protocol_type: ProtocolType) -> bool: + """Convenience function to check if protocol is enhanced""" + return PROTOCOL_REGISTRY.is_enhanced_protocol(protocol_type) \ No newline at end of file diff --git a/analyzer/tui/textual/app_v2.py b/analyzer/tui/textual/app_v2.py index 46c52eb..83b4f4b 100644 --- a/analyzer/tui/textual/app_v2.py +++ b/analyzer/tui/textual/app_v2.py @@ -15,12 +15,15 @@ from rich.console import Group from rich.panel import Panel from rich.table import Table import time +import signal +import sys from .widgets.sparkline import SparklineWidget from .widgets.metric_card import MetricCard from .widgets.flow_table_v2 import EnhancedFlowTable from .widgets.split_flow_details import FlowMainDetailsPanel, SubFlowDetailsPanel from .widgets.debug_panel import DebugPanel +from ...analysis.background_analyzer import BackgroundAnalyzer if TYPE_CHECKING: from ...analysis.core import EthernetAnalyzer @@ -50,6 +53,7 @@ class StreamLensAppV2(App): ("4", "sort('quality')", "Sort Quality"), ("p", "toggle_pause", "Pause"), ("d", "show_details", "Details"), + ("v", "toggle_view_mode", "Toggle View"), ("?", "toggle_help", "Help"), ] @@ -73,6 +77,17 @@ class StreamLensAppV2(App): self.sub_title = "Network Flow Analysis" self.paused = False + # Background parsing support + self.background_analyzer = BackgroundAnalyzer( + analyzer=analyzer, + num_threads=4, + batch_size=1000, + progress_callback=None, + flow_update_callback=self._on_flow_update + ) + self.pcap_file = None + + # Metrics history for sparklines self.packets_history = [] self.bytes_history = [] @@ -127,11 +142,20 @@ class StreamLensAppV2(App): except: pass # Debug panel not visible + # Set initial subtitle with view mode + try: + flow_table = self.query_one("#flow-table", EnhancedFlowTable) + view_mode = flow_table.get_current_view_mode() + status = "PAUSED" if self.paused else "LIVE" + self.sub_title = f"Network Flow Analysis - {status} - {view_mode} VIEW" + except: + pass + self.update_metrics() - # Set up update intervals like TipTop - self.metric_timer = self.set_interval(0.5, self.update_metrics) # 2Hz for smooth graphs - self.flow_timer = self.set_interval(1.0, self.update_flows) # 1Hz for flow data + # Set up update intervals like TipTop (reduced frequency since we have real-time updates) + self.metric_timer = self.set_interval(2.0, self.update_metrics) # 0.5Hz for background updates + self.flow_timer = self.set_interval(5.0, self.update_flows) # 0.2Hz for fallback flow updates # Initialize sparkline history self._initialize_history() @@ -177,13 +201,23 @@ class StreamLensAppV2(App): self.packets_per_sec = self.total_packets / elapsed self.bytes_per_sec = summary.get('total_bytes', 0) / elapsed - # Count enhanced and outliers + # Count enhanced and outliers (thread-safe access) enhanced = 0 outliers = 0 - for flow in self.analyzer.flows.values(): - if flow.enhanced_analysis.decoder_type != "Standard": - enhanced += 1 - outliers += len(flow.outlier_frames) + try: + # Use background analyzer's thread-safe flow access + flows = self.background_analyzer.get_current_flows() + for flow in flows.values(): + if flow.enhanced_analysis.decoder_type != "Standard": + enhanced += 1 + outliers += len(flow.outlier_frames) + except Exception: + # Fallback to direct access if background analyzer not available + for flow in self.analyzer.flows.values(): + if flow.enhanced_analysis.decoder_type != "Standard": + enhanced += 1 + outliers += len(flow.outlier_frames) + self.enhanced_flows = enhanced self.outlier_count = outliers @@ -256,6 +290,53 @@ class StreamLensAppV2(App): flow_table = self.query_one("#flow-table", EnhancedFlowTable) flow_table.refresh_data() + + def _on_flow_update(self): + """Handle flow data updates from background parser""" + try: + # Use call_from_thread to safely update UI from background thread + self.call_from_thread(self._update_flow_ui) + except Exception: + # Ignore errors during shutdown + pass + + def _update_flow_ui(self): + """Update flow UI (called from main thread)""" + try: + # Update flow table + flow_table = self.query_one("#flow-table", EnhancedFlowTable) + flow_table.refresh_data() + + # Also update metrics in real-time + self.update_metrics() + except Exception: + # Flow table widget may not be available yet + pass + + def start_background_parsing(self, pcap_file: str): + """Start parsing PCAP file in background""" + self.pcap_file = pcap_file + + # Start background parsing + self.background_analyzer.start_parsing(pcap_file) + + def stop_background_parsing(self): + """Stop background parsing""" + self.background_analyzer.stop_parsing() + + def cleanup(self): + """Cleanup resources when app shuts down""" + try: + self.background_analyzer.cleanup() + # Cancel any pending timers + if self.metric_timer: + self.metric_timer.stop() + if self.flow_timer: + self.flow_timer.stop() + except Exception as e: + # Don't let cleanup errors prevent shutdown + pass + def on_enhanced_flow_table_flow_selected(self, event: EnhancedFlowTable.FlowSelected) -> None: """Handle flow selection events""" try: @@ -290,7 +371,14 @@ class StreamLensAppV2(App): """Toggle pause state""" self.paused = not self.paused status = "PAUSED" if self.paused else "LIVE" - self.sub_title = f"Network Flow Analysis - {status}" + + # Get current view mode to maintain it in subtitle + try: + flow_table = self.query_one("#flow-table", EnhancedFlowTable) + view_mode = flow_table.get_current_view_mode() + self.sub_title = f"Network Flow Analysis - {status} - {view_mode} VIEW" + except: + self.sub_title = f"Network Flow Analysis - {status}" def action_sort(self, key: str) -> None: """Sort flow table by specified key""" @@ -302,10 +390,29 @@ class StreamLensAppV2(App): # TODO: Implement detailed flow modal pass + def action_toggle_view_mode(self) -> None: + """Toggle between simplified and detailed view modes""" + flow_table = self.query_one("#flow-table", EnhancedFlowTable) + flow_table.toggle_view_mode() + + # Update subtitle to show current view mode + view_mode = flow_table.get_current_view_mode() + status = "PAUSED" if self.paused else "LIVE" + self.sub_title = f"Network Flow Analysis - {status} - {view_mode} VIEW" + def on_mouse_down(self, event: MouseDown) -> None: """Prevent default mouse down behavior to disable mouse interaction.""" event.prevent_default() def on_mouse_move(self, event: MouseMove) -> None: """Prevent default mouse move behavior to disable mouse interaction.""" - event.prevent_default() \ No newline at end of file + event.prevent_default() + + def action_quit(self) -> None: + """Quit the application with proper cleanup""" + self.cleanup() + self.exit() + + def on_unmount(self) -> None: + """Called when app is being unmounted - ensure cleanup""" + self.cleanup() \ No newline at end of file diff --git a/analyzer/tui/textual/widgets/flow_table_v2.py b/analyzer/tui/textual/widgets/flow_table_v2.py index d76f1f3..d62ded8 100644 --- a/analyzer/tui/textual/widgets/flow_table_v2.py +++ b/analyzer/tui/textual/widgets/flow_table_v2.py @@ -6,7 +6,7 @@ from textual.widgets import DataTable from textual.containers import Vertical from textual.reactive import reactive from textual.message import Message -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List, Optional, Dict, Tuple from rich.text import Text from rich.box import ROUNDED @@ -43,6 +43,7 @@ class EnhancedFlowTable(Vertical): selected_flow_index = reactive(0) sort_key = reactive("flows") + simplified_view = reactive(False) # Toggle between detailed and simplified view def __init__(self, analyzer: 'EthernetAnalyzer', **kwargs): super().__init__(**kwargs) @@ -50,6 +51,7 @@ class EnhancedFlowTable(Vertical): self.flows_list = [] self.row_to_flow_map = {} # Map row keys to flow indices self.flow_metrics = {} # Store per-flow metrics history + self.view_mode_changed = False # Track when view mode changes def compose(self): """Create the enhanced flow table""" @@ -64,25 +66,49 @@ class EnhancedFlowTable(Vertical): def on_mount(self): """Initialize the table""" - table = self.query_one("#flows-data-table", DataTable) - - # Compact columns optimized for data density - table.add_column("#", width=2, key="num") - table.add_column("Source", width=18, key="source") - table.add_column("Proto", width=4, key="proto") - table.add_column("Destination", width=18, key="dest") - table.add_column("Extended", width=8, key="extended") - table.add_column("Frame Type", width=10, key="frame_type") - table.add_column("Pkts", width=6, key="rate") - table.add_column("Size", width=8, key="volume") - table.add_column("ΔT(ms)", width=8, key="delta_t") - table.add_column("σ(ms)", width=8, key="sigma") - table.add_column("Out", width=5, key="outliers") - + self._setup_table_columns() self.refresh_data() + def _setup_table_columns(self): + """Setup table columns based on current view mode""" + table = self.query_one("#flows-data-table", DataTable) + + # Clear existing columns if any + if table.columns: + table.clear(columns=True) + + if self.simplified_view: + # Simplified view - only main flows with summary data + table.add_column("#", width=3, key="num") + table.add_column("Source", width=18, key="source") + table.add_column("Destination", width=18, key="dest") + table.add_column("Protocol", width=8, key="protocol") + table.add_column("Packets", width=8, key="packets") + table.add_column("Volume", width=10, key="volume") + table.add_column("Avg ΔT", width=8, key="avg_delta") + table.add_column("Quality", width=8, key="quality") + table.add_column("Status", width=10, key="status") + else: + # Detailed view - original layout with subflows + table.add_column("#", width=2, key="num") + table.add_column("Source", width=18, key="source") + table.add_column("Proto", width=4, key="proto") + table.add_column("Destination", width=18, key="dest") + table.add_column("Extended", width=8, key="extended") + table.add_column("Frame Type", width=10, key="frame_type") + table.add_column("Pkts", width=6, key="rate") + table.add_column("Size", width=8, key="volume") + table.add_column("ΔT(ms)", width=8, key="delta_t") + table.add_column("σ(ms)", width=8, key="sigma") + table.add_column("Out", width=5, key="outliers") + def refresh_data(self): - """Refresh flow table with enhanced visualizations""" + """Refresh flow table with current view mode""" + # Check if view mode changed and rebuild table structure if needed + if self.view_mode_changed: + self._setup_table_columns() + self.view_mode_changed = False + table = self.query_one("#flows-data-table", DataTable) # Preserve cursor and scroll positions @@ -103,7 +129,39 @@ class EnhancedFlowTable(Vertical): # Get and sort flows self.flows_list = self._get_sorted_flows() - # Add flows with enhanced display + if self.simplified_view: + self._populate_simplified_view() + else: + self._populate_detailed_view() + + # Restore cursor position + if selected_row_key and selected_row_key in table.rows: + row_index = list(table.rows.keys()).index(selected_row_key) + table.move_cursor(row=row_index, column=cursor_column, animate=False) + elif table.row_count > 0: + # If original selection not found, try to maintain row position + new_row = min(cursor_row, table.row_count - 1) + table.move_cursor(row=new_row, column=cursor_column, animate=False) + + # Restore scroll position + table.scroll_to(x=scroll_x, y=scroll_y, animate=False) + + def _populate_simplified_view(self): + """Populate table with simplified flow summary data""" + table = self.query_one("#flows-data-table", DataTable) + + for i, flow in enumerate(self.flows_list): + # Create simplified row data - no subflows shown + row_data = self._create_simplified_row(i + 1, flow) + row_key = table.add_row(*row_data, key=f"flow_{i}") + + # Map row key to flow index + self.row_to_flow_map[row_key] = i + + def _populate_detailed_view(self): + """Populate table with detailed flow data including subflows""" + table = self.query_one("#flows-data-table", DataTable) + for i, flow in enumerate(self.flows_list): # Track metrics for this flow flow_key = f"{flow.src_ip}:{flow.src_port}-{flow.dst_ip}:{flow.dst_port}" @@ -127,20 +185,14 @@ class EnhancedFlowTable(Vertical): metrics['last_packet_count'] = flow.frame_count metrics['last_update'] = flow.last_seen - # Create row with visualizations + # Create row with detailed visualizations row_data = self._create_enhanced_row(i + 1, flow, metrics) row_key = table.add_row(*row_data, key=f"flow_{i}") # Map row key to flow index self.row_to_flow_map[row_key] = i - # Apply row styling based on status - style = self._get_flow_style(flow) - if style: - # Note: DataTable doesn't have set_row_style, using CSS classes instead - pass - - # Add sub-rows for protocol breakdown + # Add sub-rows for protocol breakdown (only in detailed view) if self._should_show_subrows(flow): sub_rows = self._create_protocol_subrows(flow) combinations = self._get_protocol_frame_combinations(flow) @@ -151,18 +203,6 @@ class EnhancedFlowTable(Vertical): if j < len(combinations): _, frame_type, _, _ = combinations[j] self.row_to_subflow_map[sub_key] = (i, frame_type) - - # Restore cursor position - if selected_row_key and selected_row_key in table.rows: - row_index = list(table.rows.keys()).index(selected_row_key) - table.move_cursor(row=row_index, column=cursor_column, animate=False) - elif table.row_count > 0: - # If original selection not found, try to maintain row position - new_row = min(cursor_row, table.row_count - 1) - table.move_cursor(row=new_row, column=cursor_column, animate=False) - - # Restore scroll position - table.scroll_to(x=scroll_x, y=scroll_y, animate=False) def _create_enhanced_row(self, num: int, flow: 'FlowStats', metrics: dict) -> List[Text]: """Create enhanced row with inline visualizations""" @@ -229,6 +269,64 @@ class EnhancedFlowTable(Vertical): delta_t_text, sigma_text, outlier_text ] + def _create_simplified_row(self, num: int, flow: 'FlowStats') -> List[Text]: + """Create simplified row with summary data only""" + # Flow number + num_text = Text(str(num), justify="right") + + # Source (IP only for simplified view) + source_text = Text(flow.src_ip) + + # Destination (IP only for simplified view) + dest_text = Text(flow.dst_ip) + + # Main protocol (transport + extended if available) + extended = self._get_extended_protocol(flow) + if extended != "-": + protocol_str = f"{flow.transport_protocol}/{extended}" + else: + protocol_str = flow.transport_protocol + protocol_text = Text(protocol_str, style="bold cyan") + + # Total packet count + packets_text = Text(str(flow.frame_count), justify="right") + + # Total volume + volume_text = Text(self._format_bytes(flow.total_bytes), justify="right") + + # Average delta T + if flow.avg_inter_arrival > 0: + delta_t_ms = flow.avg_inter_arrival * 1000 + if delta_t_ms >= 1000: + avg_delta_str = f"{delta_t_ms/1000:.1f}s" + else: + avg_delta_str = f"{delta_t_ms:.1f}ms" + else: + avg_delta_str = "N/A" + avg_delta_text = Text(avg_delta_str, justify="right") + + # Quality score as percentage + quality_score = self._get_quality_score(flow) + quality_text = Text(f"{quality_score}%", justify="right", + style="green" if quality_score >= 90 else + "yellow" if quality_score >= 70 else "red") + + # Flow status + status = self._get_flow_status(flow) + status_color = { + "Enhanced": "bold blue", + "Alert": "bold red", + "Warning": "yellow", + "Normal": "green" + }.get(status, "white") + status_text = Text(status, style=status_color) + + return [ + num_text, source_text, dest_text, protocol_text, + packets_text, volume_text, avg_delta_text, + quality_text, status_text + ] + def _create_rate_sparkline(self, history: List[float]) -> str: """Create mini sparkline for rate""" if not history: @@ -319,16 +417,60 @@ class EnhancedFlowTable(Vertical): def _should_show_subrows(self, flow: 'FlowStats') -> bool: """Determine if flow should show protocol breakdown""" - # Show subrows for flows with multiple frame types or enhanced analysis - return (len(flow.frame_types) > 1 or - flow.enhanced_analysis.decoder_type != "Standard") + # Only show subrows if there are enhanced frame types + enhanced_frame_types = self._get_enhanced_frame_types(flow) + return len(enhanced_frame_types) > 0 + + def _get_enhanced_frame_types(self, flow: 'FlowStats') -> Dict[str, 'FrameTypeStats']: + """Get only frame types that belong to enhanced protocols""" + enhanced_protocols = {'CHAPTER10', 'CH10', 'PTP', 'IENA'} + enhanced_frame_types = {} + + for frame_type, stats in flow.frame_types.items(): + # Check if this frame type belongs to an enhanced protocol + if any(enhanced_proto in frame_type for enhanced_proto in enhanced_protocols): + enhanced_frame_types[frame_type] = stats + elif frame_type.startswith(('CH10-', 'PTP-', 'IENA-')): + enhanced_frame_types[frame_type] = stats + elif frame_type in ('TMATS', 'TMATS-Data'): # TMATS is part of Chapter 10 + enhanced_frame_types[frame_type] = stats + + return enhanced_frame_types + + def _get_enhanced_protocol_frame_combinations(self, flow: 'FlowStats', enhanced_frame_types: Dict[str, 'FrameTypeStats']) -> List[Tuple[str, str, int, float]]: + """Get protocol/frame combinations for enhanced protocols only""" + combinations = [] + total_packets = flow.frame_count + + # Group enhanced frame types by extended protocol + protocol_frames = {} + + for frame_type, ft_stats in enhanced_frame_types.items(): + # Determine extended protocol for this frame type + extended_proto = self._get_extended_protocol_for_frame(flow, frame_type) + + if extended_proto not in protocol_frames: + protocol_frames[extended_proto] = [] + + protocol_frames[extended_proto].append((frame_type, ft_stats.count)) + + # Convert to list of tuples with percentages + for extended_proto, frame_list in protocol_frames.items(): + for frame_type, count in frame_list: + percentage = (count / total_packets * 100) if total_packets > 0 else 0 + combinations.append((extended_proto, frame_type, count, percentage)) + + # Sort by count (descending) + combinations.sort(key=lambda x: x[2], reverse=True) + return combinations def _create_protocol_subrows(self, flow: 'FlowStats') -> List[List[Text]]: - """Create sub-rows for protocol/frame type breakdown""" + """Create sub-rows for enhanced protocol/frame type breakdown only""" subrows = [] - combinations = self._get_protocol_frame_combinations(flow) + enhanced_frame_types = self._get_enhanced_frame_types(flow) + combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types) - for extended_proto, frame_type, count, percentage in combinations: # Show all subrows + for extended_proto, frame_type, count, percentage in combinations: # Show all enhanced subrows # Calculate timing for this frame type if available frame_delta_t = "" frame_sigma = "" @@ -385,6 +527,16 @@ class EnhancedFlowTable(Vertical): self.sort_key = key self.refresh_data() + def toggle_view_mode(self): + """Toggle between simplified and detailed view modes""" + self.simplified_view = not self.simplified_view + self.view_mode_changed = True + self.refresh_data() + + def get_current_view_mode(self) -> str: + """Get current view mode as string""" + return "SIMPLIFIED" if self.simplified_view else "DETAILED" + class FlowSelected(Message): """Message sent when a flow is selected""" def __init__(self, flow: Optional['FlowStats'], subflow_type: Optional[str] = None) -> None: @@ -451,6 +603,19 @@ class EnhancedFlowTable(Vertical): self.post_message(self.FlowSelected(selected_flow, subflow_type)) # Helper methods from original implementation + def _get_extended_protocol_for_frame(self, flow: 'FlowStats', frame_type: str) -> str: + """Get extended protocol for a specific frame type""" + if frame_type.startswith('CH10') or frame_type == 'TMATS': + return 'CH10' + elif frame_type.startswith('PTP'): + return 'PTP' + elif frame_type == 'IENA': + return 'IENA' + elif frame_type == 'NTP': + return 'NTP' + else: + return self._get_extended_protocol(flow) + def _get_extended_protocol(self, flow: 'FlowStats') -> str: """Get extended protocol""" if flow.detected_protocol_types: diff --git a/analyzer/tui/textual/widgets/progress_bar.py b/analyzer/tui/textual/widgets/progress_bar.py new file mode 100644 index 0000000..d98be2f --- /dev/null +++ b/analyzer/tui/textual/widgets/progress_bar.py @@ -0,0 +1,121 @@ +""" +Progress Bar Widget for PCAP parsing progress +""" + +from textual.widget import Widget +from textual.reactive import reactive +from rich.console import RenderableType +from rich.progress import Progress, BarColumn, TextColumn, TaskProgressColumn, MofNCompleteColumn, TimeRemainingColumn +from rich.text import Text +import time + + +class ParsingProgressBar(Widget): + """Progress bar widget for PCAP parsing with rich formatting""" + + DEFAULT_CSS = """ + ParsingProgressBar { + height: 3; + margin: 1; + padding: 1; + background: $surface; + border: solid $accent; + } + """ + + # Reactive attributes + progress = reactive(0.0) + total_packets = reactive(0) + processed_packets = reactive(0) + packets_per_second = reactive(0.0) + estimated_time_remaining = reactive(0.0) + is_complete = reactive(False) + is_visible = reactive(False) + error_message = reactive("") + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.start_time = None + + def render(self) -> RenderableType: + """Render the progress bar""" + if not self.is_visible: + return Text("") + + if self.error_message: + return Text(f"❌ Error: {self.error_message}", style="red") + + if self.is_complete: + elapsed = time.time() - self.start_time if self.start_time else 0 + return Text( + f"✅ Parsing complete! {self.processed_packets:,} packets processed in {elapsed:.1f}s", + style="green" + ) + + # Create rich progress bar + progress = Progress( + TextColumn("[bold blue]Parsing PCAP..."), + BarColumn(bar_width=40), + TaskProgressColumn(), + MofNCompleteColumn(), + TextColumn("•"), + TextColumn("{task.fields[rate]}"), + TextColumn("•"), + TimeRemainingColumn(), + expand=False + ) + + # Format rate display + if self.packets_per_second >= 1000: + rate_str = f"{self.packets_per_second/1000:.1f}K pkt/s" + else: + rate_str = f"{self.packets_per_second:.0f} pkt/s" + + task = progress.add_task( + "parsing", + total=self.total_packets, + completed=self.processed_packets, + rate=rate_str + ) + + return progress + + def start_parsing(self, total_packets: int): + """Start showing progress for parsing""" + self.total_packets = total_packets + self.processed_packets = 0 + self.progress = 0.0 + self.is_complete = False + self.is_visible = True + self.error_message = "" + self.start_time = time.time() + self.refresh() + + def update_progress(self, processed: int, total: int, pps: float, eta: float): + """Update progress values""" + self.processed_packets = processed + self.total_packets = total + self.packets_per_second = pps + self.estimated_time_remaining = eta + self.progress = (processed / total * 100) if total > 0 else 0 + self.refresh() + + def complete_parsing(self): + """Mark parsing as complete""" + self.is_complete = True + self.refresh() + # Hide after 3 seconds + self.set_timer(3.0, self.hide_progress) + + def show_error(self, error: str): + """Show error message""" + self.error_message = error + self.is_visible = True + self.refresh() + # Hide after 5 seconds + self.set_timer(5.0, self.hide_progress) + + def hide_progress(self): + """Hide the progress bar""" + self.is_visible = False + self.refresh() \ No newline at end of file diff --git a/analyzer/tui/textual/widgets/split_flow_details.py b/analyzer/tui/textual/widgets/split_flow_details.py index 22eabf9..000cba8 100644 --- a/analyzer/tui/textual/widgets/split_flow_details.py +++ b/analyzer/tui/textual/widgets/split_flow_details.py @@ -10,7 +10,7 @@ from rich.text import Text from rich.panel import Panel from rich.console import RenderableType, Group from rich.table import Table -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Dict if TYPE_CHECKING: from ....models import FlowStats, FrameTypeStats @@ -106,19 +106,18 @@ class FlowMainDetailsPanel(Vertical): sections.append(Text("Enhanced Analysis", style="bold green")) sections.append(enhanced_table) - # Timing analysis - only show if no sub-flows exist + # Timing analysis - only show if no enhanced sub-flows exist # Match the same logic as _should_show_subrows in flow_table_v2.py - has_subflows = (len(flow.frame_types) > 1 or - flow.enhanced_analysis.decoder_type != "Standard") + has_enhanced_subflows = self._has_enhanced_subflows(flow) # Debug output try: debug_panel = self.app.query_one("#debug-panel") - debug_panel.add_debug_message(f"TIMING_LOGIC: {flow.src_ip}:{flow.src_port} - types={len(flow.frame_types)}, decoder={flow.enhanced_analysis.decoder_type}, has_subflows={has_subflows}") + debug_panel.add_debug_message(f"TIMING_LOGIC: {flow.src_ip}:{flow.src_port} - types={len(flow.frame_types)}, decoder={flow.enhanced_analysis.decoder_type}, has_enhanced_subflows={has_enhanced_subflows}") except: pass - if not has_subflows: + if not has_enhanced_subflows: try: debug_panel = self.app.query_one("#debug-panel") debug_panel.add_debug_message(f"BRANCH: Taking FULL timing branch for {flow.src_ip}:{flow.src_port}") @@ -160,6 +159,19 @@ class FlowMainDetailsPanel(Vertical): return Group(*sections) + def _has_enhanced_subflows(self, flow: 'FlowStats') -> bool: + """Check if flow has enhanced frame types that warrant sub-rows""" + enhanced_protocols = {'CHAPTER10', 'CH10', 'PTP', 'IENA'} + + for frame_type in flow.frame_types.keys(): + # Check if this frame type belongs to an enhanced protocol + if any(enhanced_proto in frame_type for enhanced_proto in enhanced_protocols): + return True + elif frame_type.startswith(('CH10-', 'PTP-', 'IENA-')): + return True + + return False + def _format_bytes(self, bytes_count: int) -> str: """Format byte count with units""" if bytes_count >= 1_000_000_000: @@ -276,14 +288,23 @@ class SubFlowDetailsPanel(Vertical): return Group(*sections) def _create_subflow_summary(self, flow: 'FlowStats') -> RenderableType: - """Create summary of all sub-flows""" - if not flow.frame_types or len(flow.frame_types) <= 1: + """Create summary of all sub-flows for enhanced flows""" + # For enhanced flows, show ALL frame types, not just enhanced ones + if flow.enhanced_analysis.decoder_type != "Standard": + frame_types_to_show = flow.frame_types + title = "Sub-Flow Summary (All Frame Types)" + else: + # For standard flows, only show enhanced frame types if any + frame_types_to_show = self._get_enhanced_frame_types(flow) + title = "Enhanced Sub-Flow Summary" + + if not frame_types_to_show: return Text("No sub-flows available", style="dim") sections = [] - sections.append(Text("Sub-Flow Summary", style="bold yellow")) + sections.append(Text(title, style="bold yellow")) - # Frame type breakdown table + # Frame type breakdown table for enhanced protocols only frame_table = Table(show_header=True, box=None) frame_table.add_column("Frame Type", style="blue") frame_table.add_column("Count", justify="right") @@ -294,7 +315,7 @@ class SubFlowDetailsPanel(Vertical): total = flow.frame_count for frame_type, stats in sorted( - flow.frame_types.items(), + frame_types_to_show.items(), key=lambda x: x[1].count, reverse=True ): @@ -315,6 +336,22 @@ class SubFlowDetailsPanel(Vertical): sections.append(frame_table) return Group(*sections) + def _get_enhanced_frame_types(self, flow: 'FlowStats') -> Dict[str, 'FrameTypeStats']: + """Get only frame types that belong to enhanced protocols""" + enhanced_protocols = {'CHAPTER10', 'CH10', 'PTP', 'IENA'} + enhanced_frame_types = {} + + for frame_type, stats in flow.frame_types.items(): + # Check if this frame type belongs to an enhanced protocol + if any(enhanced_proto in frame_type for enhanced_proto in enhanced_protocols): + enhanced_frame_types[frame_type] = stats + elif frame_type.startswith(('CH10-', 'PTP-', 'IENA-')): + enhanced_frame_types[frame_type] = stats + elif frame_type in ('TMATS', 'TMATS-Data'): # TMATS is part of Chapter 10 + enhanced_frame_types[frame_type] = stats + + return enhanced_frame_types + def _format_bytes(self, bytes_count: int) -> str: """Format byte count with units""" if bytes_count >= 1_000_000_000: diff --git a/data_summary.md b/data_summary.md new file mode 100644 index 0000000..0838ead --- /dev/null +++ b/data_summary.md @@ -0,0 +1,461 @@ +# StreamLens Data Flow Architecture + +## Overview + +StreamLens is a real-time network traffic analyzer that processes packets, groups them into flows, detects protocols, and provides enhanced analysis for specialized protocols like Chapter 10, PTP, and IENA. + +## Complete Data Flow + +### 1. Packet Input Layer +```python +# analyzer/analysis/core.py:36-43 +def analyze_pcap(self, pcap_file: str): + packets = rdpcap(pcap_file) # Scapy reads PCAP + self._process_packets(packets) # Process each packet +``` + +**Sources**: +- PCAP/PCAPNG files +- Live network interfaces via Scapy + +### 2. Packet Processing Engine +```python +# analyzer/analysis/core.py:65-67 +def _process_packets(self, packets): + for i, packet in enumerate(packets): + self.dissector.dissect_packet(packet, i + 1) +``` + +**Per-packet operations**: +- Frame number assignment +- Timestamp extraction +- Initial protocol detection + +### 3. Flow Management & Classification +```python +# analyzer/analysis/flow_manager.py:38-58 +def process_packet(self, packet, frame_num): + # Extract network identifiers + src_ip, dst_ip = ip_layer.src, ip_layer.dst + transport_info = self._extract_transport_info(packet) # ports, protocol + + # Create unique flow key + flow_key = (src_ip, dst_ip) + + # Initialize new flow or use existing + if flow_key not in self.flows: + self.flows[flow_key] = FlowStats(...) +``` + +**Flow Bucketing**: +- Packets grouped by `(src_ip, dst_ip)` pairs +- Each flow tracked in a `FlowStats` object +- O(1) flow lookup via hash map + +### 4. Protocol Detection Pipeline +```python +# analyzer/analysis/flow_manager.py:89-96 +# Basic protocols (UDP, TCP, ICMP) +protocols = self._detect_basic_protocols(packet) + +# Enhanced protocol detection +dissection_results = self._dissect_packet(packet, frame_num) +enhanced_protocols = self._extract_enhanced_protocols(dissection_results) +flow.detected_protocol_types.update(enhanced_protocols) + +# Fallback detection +fallback_protocols = self._detect_fallback_protocols(packet, dissection_results) +``` + +**Protocol Hierarchy**: +1. **Basic**: Transport layer (UDP/TCP/ICMP) +2. **Enhanced**: Application layer (CH10/PTP/IENA) via specialized dissectors +3. **Fallback**: Port-based heuristics for unidentified protocols + +### 5. Frame Type Classification +```python +# analyzer/analysis/flow_manager.py:98-100 +frame_type = self._classify_frame_type(packet, dissection_results) +self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size) +``` + +**Sub-Flow Binning**: +- Each packet classified into specific frame types +- Enhanced protocols: `CH10-Data`, `CH10-ACTTS`, `PTP-Sync`, `IENA-Control` +- Standard protocols: `UDP-Data`, `TCP-Stream` + +### 6. Statistical Analysis Engine +```python +# analyzer/analysis/flow_manager.py:105-112 +if len(flow.timestamps) > 1: + inter_arrival = timestamp - flow.timestamps[-2] + flow.inter_arrival_times.append(inter_arrival) + + # Real-time statistics if enabled + if self.statistics_engine and self.statistics_engine.enable_realtime: + self.statistics_engine.update_realtime_statistics(flow_key, flow) +``` + +**Timing Calculations**: +- **Flow-level**: Aggregate timing across all packets +- **Frame-type level**: Per-subtype timing in `FrameTypeStats` +- **Outlier detection**: Based on configurable sigma thresholds +- **Real-time updates**: Live statistical calculations during capture + +### 7. Enhanced Protocol Analysis +```python +# analyzer/analysis/flow_manager.py:102-103 +self._perform_enhanced_analysis(packet, flow, frame_num, transport_info) +``` + +**Specialized Decoders**: +- **Chapter 10**: Timing analysis, sequence validation, TMATS parsing +- **PTP**: Clock synchronization analysis, message type classification +- **IENA**: Packet structure validation, timing quality assessment + +### 8. TUI Presentation Layer +```python +# analyzer/tui/textual/widgets/flow_table_v2.py:143-153 +if self._should_show_subrows(flow): + enhanced_frame_types = self._get_enhanced_frame_types(flow) + combinations = self._get_enhanced_protocol_frame_combinations(flow, enhanced_frame_types) + + for extended_proto, frame_type, count, percentage in combinations: + sub_key = table.add_row(*sub_row, key=f"flow_{i}_sub_{j}") + self.row_to_subflow_map[sub_key] = (i, frame_type) +``` + +**Presentation Logic**: +- **Main Rows**: Flow-level aggregates (all packets in flow) +- **Sub-Rows**: Only enhanced protocols (`CH10-*`, `PTP-*`, `IENA-*`) +- **Standard protocols**: Aggregated into main flow timing +- **Timing display**: ΔT (avg inter-arrival), σ (std deviation), outlier count + +## Core Data Types + +### FlowStats +Main container for flow-level statistics: + +```python +@dataclass +class FlowStats: + # Network identifiers + src_ip: str + dst_ip: str + src_port: int = 0 # 0 if not applicable + dst_port: int = 0 + transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc. + traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast + + # Aggregate statistics + frame_count: int = 0 + total_bytes: int = 0 + first_seen: float = 0.0 # Timestamp of first frame + last_seen: float = 0.0 # Timestamp of last frame + duration: float = 0.0 # Flow duration in seconds + + # Timing data + timestamps: List[float] = field(default_factory=list) + frame_numbers: List[int] = field(default_factory=list) + inter_arrival_times: List[float] = field(default_factory=list) + avg_inter_arrival: float = 0.0 + std_inter_arrival: float = 0.0 + jitter: float = 0.0 + + # Outlier tracking + outlier_frames: List[int] = field(default_factory=list) + outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta) + + # Protocol classification + protocols: Set[str] = field(default_factory=set) # Basic protocols seen + detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocols (CH10, PTP, IENA) + + # Sub-flow bins + frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) + + # Enhanced analysis results + enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData) +``` + +### FrameTypeStats +Statistics for specific frame types within a flow: + +```python +@dataclass +class FrameTypeStats: + # Frame type identifier + frame_type: str # e.g., "CH10-Data", "PTP-Sync", "UDP-Data" + + # Basic statistics + count: int = 0 + total_bytes: int = 0 + + # Timing data + timestamps: List[float] = field(default_factory=list) + frame_numbers: List[int] = field(default_factory=list) + inter_arrival_times: List[float] = field(default_factory=list) + avg_inter_arrival: float = 0.0 + std_inter_arrival: float = 0.0 + + # Outlier tracking + outlier_frames: List[int] = field(default_factory=list) + outlier_details: List[Tuple[int, float]] = field(default_factory=list) +``` + +### Enhanced Analysis Data Models + +#### New Modular Structure +The enhanced analysis data has been restructured into focused components for better organization and extensibility: + +```python +@dataclass +class TimingAnalysis: + """Timing analysis results for enhanced protocols""" + avg_clock_drift_ppm: float = 0.0 + max_clock_drift_ppm: float = 0.0 + min_clock_drift_ppm: float = 0.0 + drift_variance: float = 0.0 + + quality: TimingQuality = TimingQuality.UNKNOWN # EXCELLENT, GOOD, MODERATE, POOR + stability: TimingStability = TimingStability.UNKNOWN # STABLE, VARIABLE, UNSTABLE + + timing_accuracy_percent: float = 0.0 + sync_errors: int = 0 + timing_anomalies: int = 0 + anomaly_rate_percent: float = 0.0 + + has_internal_timing: bool = False + rtc_sync_available: bool = False + +@dataclass +class QualityMetrics: + """Quality metrics for enhanced protocol data""" + avg_frame_quality_percent: float = 0.0 + frame_quality_samples: List[float] = field(default_factory=list) + + avg_signal_quality_percent: float = 0.0 + signal_quality_samples: List[float] = field(default_factory=list) + + sequence_gaps: int = 0 + format_errors: int = 0 + overflow_errors: int = 0 + checksum_errors: int = 0 + + avg_confidence_score: float = 0.0 + confidence_samples: List[float] = field(default_factory=list) + low_confidence_frames: int = 0 + + corrupted_frames: int = 0 + missing_frames: int = 0 + duplicate_frames: int = 0 + +@dataclass +class DecodedData: + """Container for decoded protocol data""" + channel_count: int = 0 + analog_channels: int = 0 + pcm_channels: int = 0 + discrete_channels: int = 0 + + primary_data_type: DataType = DataType.UNKNOWN # ANALOG, PCM, DISCRETE, TIME, VIDEO, TMATS + secondary_data_types: Set[DataType] = field(default_factory=set) + + sample_decoded_fields: Dict[str, Any] = field(default_factory=dict) + available_field_names: List[str] = field(default_factory=list) + field_count: int = 0 + critical_fields: List[str] = field(default_factory=list) + + frame_types: Set[str] = field(default_factory=set) + frame_type_distribution: Dict[str, int] = field(default_factory=dict) + + tmats_frames: int = 0 + setup_frames: int = 0 + data_frames: int = 0 + + decoder_type: str = "Standard" + decoder_version: Optional[str] = None + decode_success_rate: float = 1.0 + +@dataclass +class EnhancedAnalysisData: + """Complete enhanced analysis data combining all analysis types""" + timing: TimingAnalysis = field(default_factory=TimingAnalysis) + quality: QualityMetrics = field(default_factory=QualityMetrics) + decoded: DecodedData = field(default_factory=DecodedData) + + # Legacy compatibility fields maintained for backward compatibility + # (automatically synchronized with component data) +``` + +#### Protocol Information Models + +```python +@dataclass +class DecodedField: + """Represents a single decoded field from a protocol""" + name: str + value: Any + field_type: FieldType # INTEGER, FLOAT, STRING, BOOLEAN, TIMESTAMP, etc. + description: Optional[str] = None + unit: Optional[str] = None # e.g., "ms", "bytes", "ppm" + confidence: float = 1.0 # 0.0 to 1.0 + is_critical: bool = False + +@dataclass +class ProtocolInfo: + """Information about a detected protocol""" + protocol_type: ProtocolType + name: str + category: ProtocolCategory # TRANSPORT, NETWORK, ENHANCED, TIMING, TELEMETRY + version: Optional[str] = None + confidence: float = 1.0 + + port: Optional[int] = None + subtype: Optional[str] = None # e.g., "CH10-Data", "PTP-Sync" + vendor: Optional[str] = None + +@dataclass +class ProtocolDecodeResult: + """Result of protocol decoding""" + protocol_info: ProtocolInfo + fields: List[DecodedField] = field(default_factory=list) + frame_type: Optional[str] = None + payload_size: int = 0 + errors: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) +``` + +## Key Processing Components + +### EthernetAnalyzer +Top-level orchestrator with background processing capabilities: +```python +class EthernetAnalyzer: + def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0): + self.statistics_engine = StatisticsEngine(outlier_threshold_sigma, enable_realtime) + self.flow_manager = FlowManager(self.statistics_engine) + self.all_packets: List[Packet] = [] + self.flows = self.flow_manager.flows # Exposed for UI access + self.background_analyzer = None # For large PCAP background processing +``` + +### BackgroundAnalyzer +Thread pool based PCAP processing for large files: +```python +class BackgroundAnalyzer: + def __init__(self, analyzer: EthernetAnalyzer, + num_threads: int = 4, + batch_size: int = 1000, + progress_callback: Optional[Callable[[ParsingProgress], None]] = None, + flow_update_callback: Optional[Callable[[], None]] = None): + self.analyzer = analyzer + self.executor = ThreadPoolExecutor(max_workers=num_threads) + self.batch_size = batch_size + self.progress_callback = progress_callback + self.flow_update_callback = flow_update_callback + self.is_running = False + self._shutdown_event = threading.Event() +``` + +### FlowManager +Packet processing and flow creation with enhanced protocol support: +```python +class FlowManager: + def __init__(self, statistics_engine=None): + self.flows: Dict[Tuple[str, str], FlowStats] = {} + self.specialized_dissectors = { + 'chapter10': Chapter10Dissector(), + 'ptp': PTPDissector(), + 'iena': IENADissector() + } + self.enhanced_ch10_decoder = EnhancedChapter10Decoder() + self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin() + self.protocol_registry = ProtocolRegistry() # New protocol management +``` + +### ProtocolRegistry +Centralized protocol information management: +```python +class ProtocolRegistry: + def __init__(self): + self._protocols: Dict[ProtocolType, ProtocolInfo] = {} + self._register_standard_protocols() # UDP, TCP, ICMP, IGMP + self._register_enhanced_protocols() # CH10, PTP, IENA, NTP + + def get_enhanced_protocols(self) -> List[ProtocolInfo] + def is_enhanced_protocol(self, protocol_type: ProtocolType) -> bool + def get_protocols_by_category(self, category: ProtocolCategory) -> List[ProtocolInfo] +``` + +## Enhanced Protocol Definitions + +### Chapter 10 (CH10) +IRIG 106 Chapter 10 data recording format: +- Frame types: `CH10-Data`, `CH10-ACTTS`, `CH10-GPS`, `CH10-ACMI`, `CH10-Timing` +- Enhanced timing analysis with clock drift detection +- TMATS metadata parsing + +### Precision Time Protocol (PTP) +IEEE 1588 precision clock synchronization: +- Frame types: `PTP-Sync`, `PTP-Delay_Req`, `PTP-Follow_Up` +- Clock synchronization quality analysis + +### IENA +iNET-compatible Ethernet protocol: +- Frame types: `IENA-Control`, `IENA-Data` +- Packet structure validation + +## TUI Display Rules + +### Main Flow Row Shows: +- Source/Destination IP:Port +- Transport protocol (UDP/TCP) +- Total packets and volume +- **Full timing statistics** (ΔT, σ, outliers) if no enhanced sub-flows exist +- **Basic timeline only** (duration, first/last seen) if enhanced sub-flows exist + +### Sub-Rows Show (Enhanced Protocols Only): +- Frame type name (e.g., `CH10-Data`) +- Packet count and percentage of flow +- Individual timing statistics (ΔT, σ, outliers) +- Protocol-specific enhanced analysis + +### Standard Protocols (UDP/TCP): +- Aggregated into main flow statistics +- No sub-rows created +- Timing calculated across all standard packets in flow + +## Processing Efficiency + +- **Streaming**: Packets processed one-by-one (memory efficient) +- **Bucketing**: O(1) flow lookup via hash map +- **Hierarchical**: Statistics calculated at multiple levels simultaneously +- **Real-time**: Live updates during capture without full recalculation +- **Selective display**: Only enhanced protocols get detailed sub-flow analysis +- **Background processing**: Large PCAP files processed in thread pools with progressive UI updates +- **Thread-safe updates**: Concurrent data access protected with locks and atomic operations +- **Modular data structures**: Separated timing, quality, and decoded data for focused analysis + +## Recent Architecture Enhancements + +### Background Processing System +- **ThreadPoolExecutor**: Multi-threaded PCAP processing for large files +- **Progress callbacks**: Real-time parsing progress updates to TUI +- **Flow update callbacks**: Progressive flow data updates every 50 packets +- **Thread-safe shutdown**: Proper cleanup and signal handling + +### Modular Data Models +- **TimingAnalysis**: Dedicated timing metrics with quality classifications +- **QualityMetrics**: Comprehensive error tracking and confidence scoring +- **DecodedData**: Structured protocol field and channel information +- **ProtocolRegistry**: Centralized protocol information management +- **Legacy compatibility**: Maintains existing API while enabling new features + +### Enhanced TUI Features +- **Progressive loading**: TUI appears immediately while PCAP loads in background +- **Real-time updates**: Flow table refreshes as new packets are processed +- **Split panel layout**: Separate main flow and sub-flow detail views +- **Enhanced-only sub-rows**: Standard protocols aggregate into main flow +- **Timing column visibility**: Context-aware display based on sub-flow presence + +This architecture enables **real-time analysis of multi-protocol network traffic** with **enhanced timing analysis** for specialized protocols while maintaining **efficient memory usage**, **responsive UI updates**, and **scalable background processing** for large PCAP files. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..07ddb8c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,25 @@ +contourpy==1.3.2 +cycler==0.12.1 +fonttools==4.59.0 +kiwisolver==1.4.8 +linkify-it-py==2.0.3 +markdown-it-py==3.0.0 +matplotlib==3.10.3 +mdit-py-plugins==0.4.2 +mdurl==0.1.2 +numpy==2.3.2 +packaging==25.0 +pillow==11.3.2 +Pygments==2.19.2 +pyparsing==3.2.3 +PySide6==6.9.1 +PySide6_Addons==6.9.1 +PySide6_Essentials==6.9.1 +python-dateutil==2.9.0.post0 +rich==14.1.0 +scapy==2.6.1 +shiboken6==6.9.1 +six==1.17.0 +textual==5.0.1 +typing_extensions==4.14.1 +uc-micro-py==1.0.3 \ No newline at end of file