diff --git a/COLUMN_LAYOUT_EXAMPLE.md b/COLUMN_LAYOUT_EXAMPLE.md new file mode 100644 index 0000000..d6fadca --- /dev/null +++ b/COLUMN_LAYOUT_EXAMPLE.md @@ -0,0 +1,37 @@ +# Modern TUI Column Layout Example + +## Flow Analysis View (View 1) + +The new column layout separates transport and extended protocols for clearer flow analysis: + +``` + # Source Proto Destination Extended Frame Type Pkts Volume Timing Quality + 1 192.168.4.89:1024 UDP 239.1.2.10:8080 CH10 CH10-Data 1452 1.9MB 77.8ms 95% + 2 11.59.19.204:319 UDP 224.0.1.129:319 PTP PTP Sync 297 26.8KB 378.4ms Normal + 3 11.59.19.202:4001 UDP 239.0.1.133:4001 - UDP 113 17.4KB 999.4ms Normal + 4 192.168.43.111:68 UDP 255.255.255.255:67 - UDP 46 3.8KB 2.3s Normal + 5 11.59.19.204:80 OTHER 224.0.0.22:80 - IGMP 6 360B 13.9s Normal +``` + +## Key Improvements + +1. **Transport Protocol Clarity**: Proto column shows TCP, UDP, ICMP, IGMP, OTHER +2. **Extended Protocol Support**: Separate column for specialized protocols (CH10, PTP, IENA, NTP) +3. **Frame Type Detail**: Shows the most common frame type for detailed analysis +4. **Distinct Source/Destination**: Clear separation with IP:port format +5. **Left-Aligned Text**: Source, destination, and protocol columns for better readability +6. **Comprehensive Flow Info**: Transport → Extended → Frame type hierarchy + +## Column Widths + +- Source: 20 characters (left-aligned) - IP:port format +- Proto: 6 characters (left-aligned) - Transport protocol (UDP, TCP, etc.) +- Destination: 20 characters (left-aligned) - IP:port format +- Extended: 10 characters (left-aligned) - Specialized protocol (CH10, PTP, etc.) +- Frame Type: 12 characters (left-aligned) - Most common frame type +- Pkts: 6 characters (right-aligned) - Packet count +- Volume: 8 characters (right-aligned) - Data volume with units +- Timing: 8 characters (right-aligned) - Average inter-arrival time +- Quality: 8 characters (right-aligned) - Quality percentage or status + +This layout provides clear protocol hierarchy from transport layer through specialized protocols to specific frame types. \ No newline at end of file diff --git a/Glossary.md b/Glossary.md new file mode 100644 index 0000000..5353260 --- /dev/null +++ b/Glossary.md @@ -0,0 +1,147 @@ +# StreamLens Ethernet Traffic Analyzer - Glossary + +## Core Networking Terms + +### **Flow** +A logical grouping of network packets between two endpoints (source IP:port → destination IP:port). In StreamLens, a flow represents all packets traveling in one direction between specific network addresses, allowing analysis of communication patterns, timing characteristics, and protocol behavior. + +### **Socket** +A network endpoint combining an IP address and port number (e.g., 192.168.1.100:4001). Sockets define the communication endpoints for network flows. + +### **Packet** +An individual unit of data transmitted over a network, containing headers (IP, UDP/TCP) and payload data. StreamLens analyzes packet timing, size, and content to understand traffic patterns. + +### **Frame** +In the context of specialized protocols like Chapter 10, a frame refers to a structured data unit within the packet payload that contains telemetry, timing, or measurement data. + +## Protocol Analysis Terms + +### **Protocol** +A standardized set of rules for data communication. StreamLens categorizes protocols into: +- **Transport Protocols**: UDP, TCP, ICMP, IGMP +- **Application Protocols**: HTTP, DNS, NTP, DHCP +- **Specialized Protocols**: Chapter 10 (IRIG 106), PTP (Precision Time Protocol), IENA + +### **Decoder** +A software component that interprets and extracts structured information from packet payloads according to specific protocol specifications. StreamLens uses: +- **Basic Decoders**: Identify protocol types and extract header information +- **Enhanced Decoders**: Perform deep packet inspection with field-level extraction + +### **Encoding** +The method used to structure and format data within packets. Common encodings include: +- **Chapter 10**: Telemetry data encoding standard (IRIG 106) +- **PTP**: Precision Time Protocol for network synchronization +- **IENA**: Enhanced Network Access protocol for flight test + +### **Dissector** +A protocol-specific analyzer that breaks down packet contents into constituent fields and interprets their meaning. Similar to decoders but focused on protocol structure analysis. + +## Timing and Quality Analysis + +### **Inter-arrival Time** +The time interval between consecutive packets in a flow. Critical for analyzing: +- Network jitter and latency +- Data streaming consistency +- Protocol timing compliance + +### **Outlier** +A packet whose inter-arrival time deviates significantly from the expected pattern (typically >3 standard deviations). Outliers indicate: +- Network congestion +- Timing violations +- Equipment malfunctions + +### **Clock Drift** +The gradual divergence between different timing sources, measured in parts per million (PPM). Important for synchronized systems and telemetry applications. + +### **Jitter** +Variation in packet arrival times, indicating network instability or inconsistent data generation. + +## Telemetry and Specialized Data + +### **Chapter 10 (CH10)** +IRIG 106 Chapter 10 standard for flight test telemetry data recording and transmission. Contains: +- **Time stamps**: Internal timing information +- **Channel data**: Multi-channel analog and digital measurements +- **Quality indicators**: Signal quality and synchronization status + +### **TMATS (Telemetry Metadata Transfer Standard)** +Configuration and setup information transmitted alongside telemetry data, describing data formats, channel assignments, and measurement parameters. + +### **PTP (Precision Time Protocol)** +IEEE 1588 standard for high-precision clock synchronization across networks, essential for distributed measurement systems. + +### **IENA (Integrated Enhanced Network Access)** +Protocol for real-time telemetry data transmission over Ethernet networks, commonly used in flight test and aerospace applications. + +## Data Analysis Terms + +### **Flow Statistics** +Quantitative measures describing flow characteristics: +- **Frame Count**: Total packets in flow +- **Total Bytes**: Cumulative data volume +- **Average Inter-arrival**: Mean time between packets +- **Standard Deviation**: Measure of timing variability + +### **Frame Type** +Classification of packets within a flow based on content or protocol structure (e.g., "CH10-Data", "TMATS", "PTP-Sync"). + +### **Traffic Classification** +Categorization of network traffic by destination address: +- **Unicast**: Point-to-point communication +- **Multicast**: One-to-many distribution +- **Broadcast**: One-to-all transmission + +### **Enhanced Analysis** +Deep inspection and field-level extraction from specialized protocols, providing: +- Decoded frame fields +- Quality metrics +- Timing analysis +- Protocol compliance checking + +## User Interface Terms + +### **TUI (Text User Interface)** +Command-line interface using curses library for interactive navigation and real-time data display. + +### **Flow List Panel** +Left panel showing all detected flows with summary statistics, timing information, and enhanced analysis indicators. + +### **Detail Panel** +Right panel with tabbed interface: +- **Info Tab**: Flow statistics, frame types, timing analysis +- **Decode Tab**: Hierarchical display of decoded protocol fields + +### **Frame Type Breakdown** +Sub-classification of packets within a flow showing distribution of different data types and their individual timing characteristics. + +## Technical Implementation + +### **Confidence Score** +Numerical indicator (0.0-1.0) representing decoder certainty in protocol identification and field extraction accuracy. + +### **Field Extraction** +Process of parsing packet payloads to extract individual data elements according to protocol specifications. + +### **Real-time Statistics** +Live calculation and display of flow metrics during active packet capture, enabling immediate analysis of ongoing network activity. + +### **Outlier Detection** +Statistical analysis using sigma thresholds to identify packets with anomalous timing characteristics. + +## Use Cases and Applications + +### **Flight Test Telemetry** +Primary application for analyzing real-time telemetry data streams from aircraft and test equipment, ensuring data integrity and timing compliance. + +### **Network Performance Analysis** +General-purpose tool for identifying network issues, bandwidth utilization, and communication patterns. + +### **Protocol Development** +Development and debugging tool for custom network protocols, providing detailed inspection capabilities. + +### **Quality Assurance** +Verification of network equipment and protocol implementations against timing and performance specifications. + +--- + +*This glossary provides the foundation for understanding StreamLens capabilities and serves as reference for both users and developers working with network traffic analysis and telemetry systems.* \ No newline at end of file diff --git a/PROJECT_STATUS.md b/PROJECT_STATUS.md new file mode 100644 index 0000000..51b623f --- /dev/null +++ b/PROJECT_STATUS.md @@ -0,0 +1,155 @@ +# StreamLens Project Status - Bookmark + +## 📊 Current Status: Modern TUI with Enhanced Column Layout Complete + +**Date**: Session Complete +**Version**: Modern TUI with Protocol Hierarchy Interface + +## ✅ Major Accomplishments This Session + +### 1. **Modern TUI Interface Implementation** +- **Three-View Architecture**: Flow Analysis (1), Packet Decoder (2), Statistical Analysis (3) +- **View Navigation**: Simplified hotkeys from F1/F2/F3 to 1/2/3 for better accessibility +- **Context-Sensitive Interface**: Each view optimized for specific analysis tasks +- **Cross-View Communication**: Selected flows persist across view switches + +### 2. **Enhanced Column Layout Design** +- **Protocol Hierarchy**: Clear separation of transport vs. extended protocols +- **Multi-Column Structure**: + ``` + Source | Proto | Destination | Extended | Frame Type | Metrics + ``` +- **Transport Protocol Clarity**: TCP, UDP, ICMP, IGMP in dedicated column +- **Extended Protocol Support**: CH10, PTP, IENA, NTP in separate column +- **Frame Type Detail**: Most common frame type per flow (CH10-Data, TMATS, PTP Sync) + +### 3. **Comprehensive Interface Features** +- **Flow Analysis View (1)**: + - Enhanced multi-column layout with IP:port precision + - Left-aligned text columns for better readability + - Performance ranking by packet count and outliers + - Visual indicators for enhanced protocols + +- **Packet Decoder View (2)**: + - Three-panel layout: Enhanced Flows | Frame Analysis | Field Inspector + - Tab-based navigation for deep packet inspection + - Real-time decoded field display with tree-view + +- **Statistical Analysis View (3)**: + - Four analysis modes: Overview, Outlier Analysis, Quality Metrics, Timing Analysis + - Performance ranking with health metrics + - Detailed sigma deviation calculations + +### 4. **Integration and Compatibility** +- **Classic Interface Preservation**: `--classic` flag maintains original TUI +- **Modern Interface Default**: New interface is primary user experience +- **Consistent Data Display**: All views show comprehensive flow information +- **Error Handling**: Graceful fallback to console mode when curses unavailable + +## 🏗️ Technical Architecture + +### File Structure +``` +analyzer/ +├── tui/ +│ ├── interface.py # Classic TUI controller +│ ├── modern_interface.py # Modern TUI with three-view interface +│ ├── navigation.py # Navigation handling +│ ├── modern_views/ # Modern TUI view controllers +│ │ ├── flow_analysis.py # Flow Analysis View (1) +│ │ ├── packet_decoder.py # Packet Decoder View (2) +│ │ └── statistical_analysis.py # Statistical Analysis View (3) +│ └── panels/ # Classic TUI panel components +│ ├── flow_list.py # Flow list panel +│ ├── detail_panel.py # Flow details panel +│ └── timeline.py # Timeline visualization panel +``` + +### Key Classes +- **ModernTUIInterface**: Main modern interface controller +- **FlowAnalysisView**: Enhanced multi-column flow display +- **PacketDecoderView**: Three-panel deep inspection +- **StatisticalAnalysisView**: Comprehensive timing and quality analysis + +## 🎯 Feature Highlights + +### Protocol Analysis Hierarchy +1. **Transport Layer**: TCP, UDP, ICMP, IGMP identification +2. **Extended Protocols**: CH10, PTP, IENA, NTP specialized analysis +3. **Frame Types**: Specific frame analysis (CH10-Data, TMATS, PTP Sync) + +### User Experience Improvements +- **Simplified Navigation**: 1/2/3 hotkeys instead of function keys +- **Context Help**: Comprehensive help overlay with H key +- **Visual Clarity**: Left-aligned text with IP:port format +- **Performance Focus**: Flows ranked by relevance and quality + +### Data Presentation +- **Source/Destination Separation**: Clear endpoint identification +- **Protocol Visibility**: Immediate transport and extended protocol recognition +- **Frame Type Awareness**: Most common frame type per flow +- **Quality Indicators**: Performance and timing quality metrics + +## 🔧 Technical Implementation + +### Column Layout Specifications +- **Source**: 20 chars (left-aligned) - IP:port format +- **Proto**: 6 chars (left-aligned) - Transport protocol +- **Destination**: 20 chars (left-aligned) - IP:port format +- **Extended**: 10 chars (left-aligned) - Specialized protocol +- **Frame Type**: 12 chars (left-aligned) - Most common frame type +- **Metrics**: Right-aligned numeric columns (Pkts, Volume, Timing, Quality) + +### Helper Methods +- `_get_extended_protocol()`: Extracts specialized protocols +- `_get_primary_frame_type()`: Finds most common frame type +- Protocol hierarchy logic with proper fallbacks + +## 📈 Quality Assurance + +### Testing Completed +- ✅ Modern TUI integration with main application +- ✅ Classic TUI preservation with `--classic` flag +- ✅ Column layout formatting and alignment +- ✅ Protocol hierarchy extraction from flow data +- ✅ Graceful fallback to console mode +- ✅ View switching with 1/2/3 hotkeys + +### Validation Results +- Console mode shows proper flow analysis with transport protocols +- Modern interface properly initializes (tested with fallback) +- Column formatting maintains readability and information density +- Protocol detection works with Chapter 10, PTP, and standard UDP flows + +## 🎨 Design Philosophy + +The modern TUI interface represents a fundamental shift from generic flow analysis to **protocol-aware traffic analysis**: + +1. **Hierarchical Protocol Understanding**: Transport → Extended → Frame Type +2. **Visual Information Architecture**: Source | Protocol | Destination flow +3. **Specialized Analysis Views**: Each view optimized for specific tasks +4. **Accessibility**: Simple numeric hotkeys instead of function keys +5. **Professional Layout**: Left-aligned text, right-aligned metrics + +## 🚀 Ready for Production + +The StreamLens modern TUI interface is now complete with: +- ✅ Enhanced protocol hierarchy visualization +- ✅ Three specialized analysis views +- ✅ Simplified navigation (1/2/3 hotkeys) +- ✅ Comprehensive column layout +- ✅ Backward compatibility with classic interface +- ✅ Robust error handling and fallbacks + +This represents a mature, professional-grade network traffic analyzer specifically designed for aviation and industrial telemetry analysis with intuitive protocol-aware visualization. + +--- + +## 📝 Next Potential Enhancements + +*Future considerations for continued development:* +- Export functionality for decoded fields +- Visual flow diagrams in Flow Analysis view +- Real-time filtering capabilities +- Enhanced signal visualization integration +- Protocol-specific analysis plugins \ No newline at end of file diff --git a/README.md b/README.md index f02cfe3..f984724 100644 --- a/README.md +++ b/README.md @@ -20,9 +20,12 @@ python streamlens.py --gui --pcap file.pcap # GUI mode only (then open file via File menu) python streamlens.py --gui -# Analyze pcap file with TUI (flows sorted by largest sigma outliers) +# Analyze pcap file with modern TUI (Flow Analysis, Packet Decoder, Statistical Analysis views) python streamlens.py --pcap file.pcap +# Use classic TUI interface instead of modern (preserves original layout) +python streamlens.py --pcap file.pcap --classic + # Live capture with real-time statistics python streamlens.py --live --interface eth0 @@ -62,13 +65,37 @@ python streamlens.py --live --filter "port 319 or port 320" - **Threading Safety**: Main-thread plot creation eliminates Qt threading violations - **No Floating Windows**: All plots stay embedded in the grid interface -### Enhanced TUI Interface -- **Three-Panel Layout**: Flows list (top-left), flow details (top-right), timing visualization (bottom) +### 🖥️ Modern TUI Interface (Default) with Three Focused Views +- **1: Flow Analysis View**: Enhanced multi-column flow overview with protocol hierarchy + - **Source | Proto | Destination | Extended | Frame Type | Metrics** layout + - Transport protocols (TCP, UDP, ICMP, IGMP) clearly separated from extended protocols + - Extended protocol column for specialized protocols (CH10, PTP, IENA, NTP) + - Frame type column showing most common frame type per flow (CH10-Data, TMATS, PTP Sync) + - Left-aligned text columns with IP:port format for precise endpoint identification + - Performance rankings by packet count, outliers, and enhanced decoder availability +- **2: Packet Decoder View**: Deep protocol inspection and field extraction + - Three-panel layout: Enhanced Flows | Frame Analysis | Field Inspector + - Real-time decoded field display with tree-view navigation + - Tab-based interface switching with comprehensive field value inspection +- **3: Statistical Analysis View**: Timing analysis, outliers, and quality metrics + - Four analysis modes: Overview, Outlier Analysis, Quality Metrics, Timing Analysis + - Performance ranking with health metrics and network consistency indicators + - Detailed outlier breakdown with sigma deviation calculations +- **Modern Navigation**: 1/2/3 view switching with context-sensitive help and status bars +- **Enhanced Protocol Support**: Specialized views for Chapter 10, PTP, IENA with quality indicators +- **Cross-View Communication**: Selected flows persist across view switches for comprehensive analysis + +### 📊 Classic TUI Interface (--classic flag) with Professional Table Layout +- **Optimized Three-Panel Layout**: Flows list (70% width), flow details (30% width), optional timeline (bottom) +- **Professional Table Formatting**: Right-aligned numeric columns (#Frames, Bytes, ΔT Avg) with proper spacing +- **Comprehensive Flow Display**: Shows Src:Port, Dst:Port, Transport Protocol, Traffic Classification, and Encoding +- **Transport Layer Analysis**: Displays TCP, UDP, ICMP, IGMP protocols with port information +- **Traffic Classification**: Identifies Unicast, Multicast, and Broadcast traffic patterns +- **Hierarchical Frame Types**: Expandable tree view showing packet type breakdowns with aligned sub-rows +- **Magnitude Indicators**: Consistent byte formatting (1.2M, 428K, 1234B) with right alignment - **Sigma-Based Flow Sorting**: Flows automatically sorted by largest outlier sigma deviation - **Real-time Navigation**: Arrow keys to navigate between flows with instant detail updates -- **Protocol-aware Display**: Shows detected protocols in flow list and details - **Smart Protocol Detection**: Prioritizes specialized protocols (Chapter 10, PTP, IENA) over generic ones -- **Detailed Outlier Analysis**: Individual rows showing frame numbers and exact time deltas for outlier packets - **Visual Timeline**: ASCII timeline showing frame timing deviations with outlier highlighting - **Live Statistics**: Real-time running averages and outlier detection during capture @@ -140,19 +167,55 @@ Generate detailed outlier reports with `--report` flag showing frame-by-frame si - **Status Bar**: Loading progress and operation feedback ### Workflow -1. **Launch GUI with PCAP**: `python streamlens.py --gui --pcap file.pcap` (recommended) -2. **Alternative Launch**: `python streamlens.py --gui`, then File → Open PCAP... -3. **Immediate Analysis**: Flow table displays instantly with all flow data and wide embedded plots -4. **Optimized Display**: Content-fitted columns, 25% taller rows, and full-width utilization -5. **Wide Plot Visualization**: Chapter 10 flows show detailed signal plots with minimal margins -6. **Browse Flows**: View flows in the dark-themed table (Chapter 10 flows highlighted in modern blue) -7. **Analyze Details**: Select flows to view detailed information in the dark-themed bottom panel -8. **Adjust Threshold**: Use toolbar spinner to change outlier detection sensitivity -9. **Multi-Flow Comparison**: Compare signals across different flows in the same optimized view + +#### GUI Mode (Recommended) +1. **Launch GUI with PCAP**: `python streamlens.py --gui --pcap file.pcap` +2. **Immediate Analysis**: Flow table displays instantly with all flow data and wide embedded plots +3. **Optimized Display**: Content-fitted columns, 25% taller rows, and full-width utilization +4. **Wide Plot Visualization**: Chapter 10 flows show detailed signal plots with minimal margins +5. **Browse Flows**: View flows in the dark-themed table (Chapter 10 flows highlighted in modern blue) +6. **Analyze Details**: Select flows to view detailed information in the dark-themed bottom panel +7. **Adjust Threshold**: Use toolbar spinner to change outlier detection sensitivity + +#### Modern TUI Mode (Default) +1. **Launch Modern TUI**: `python streamlens.py --pcap file.pcap` +2. **Flow Analysis View (1)**: Visual flow overview with protocol detection and performance ranking +3. **Packet Decoder View (2)**: Deep packet inspection with three-panel layout for field analysis +4. **Statistical Analysis View (3)**: Comprehensive timing analysis and outlier detection +5. **View Navigation**: Use 1/2/3 to switch between analysis perspectives +6. **Context-Sensitive Help**: Press H for detailed help overlay with all controls +7. **Enhanced Protocol Analysis**: Specialized displays for Chapter 10, PTP, IENA protocols + +#### Classic TUI Mode (--classic flag) +1. **Launch Classic TUI**: `python streamlens.py --pcap file.pcap --classic` +2. **Professional Table View**: Right-aligned numeric columns with transport protocol and classification +3. **Navigate Flows**: Use ↑↓ to browse flows sorted by sigma deviation +4. **Expand Details**: Use → to show frame type breakdowns with hierarchical display +5. **Signal Visualization**: Press 'v' on Chapter 10 flows to generate signal plot files +6. **Timeline Analysis**: Press 't' to toggle timing visualization panel +7. **Live Monitoring**: Real-time statistics updates during network capture ## TUI Controls -- **↑↓**: Navigate between flows in main view +### Modern TUI Controls (Default) +- **1**: Switch to Flow Analysis View (enhanced multi-column layout) +- **2**: Switch to Packet Decoder View (three-panel inspection) +- **3**: Switch to Statistical Analysis View (timing and quality analysis) +- **H**: Toggle comprehensive help overlay +- **↑↓**: Navigate items in current view +- **Enter**: Select flow/packet for detailed analysis +- **Tab**: Switch panels (when available) +- **V**: Visualize signals (Flow Analysis) +- **D**: Deep decode selected flow +- **E**: Export decoded data +- **R**: Refresh statistics +- **O**: Show outlier details +- **Q**: Quit application + +### Classic TUI Controls (--classic flag) +- **↑↓**: Navigate between flows and frame types in main view +- **→**: Expand flow to show frame type breakdowns +- **←**: Collapse flow details - **v**: Visualize Chapter 10 signals for selected flow (saves plot files) - **t**: Toggle timeline panel on/off - **d**: Switch to frame dissection view @@ -194,9 +257,14 @@ streamlens/ │ │ ├── main_window.py # PySide6 main window with docking system │ │ └── dock_panels.py # Dockable panel implementations (flow list, plots, details) │ ├── tui/ # Text User Interface -│ │ ├── interface.py # Main TUI controller +│ │ ├── interface.py # Classic TUI controller +│ │ ├── modern_interface.py # Modern TUI with three-view interface │ │ ├── navigation.py # Navigation handling -│ │ └── panels/ # UI panel components +│ │ ├── modern_views/ # Modern TUI view controllers +│ │ │ ├── flow_analysis.py # Flow Analysis View (F1) +│ │ │ ├── packet_decoder.py # Packet Decoder View (F2) +│ │ │ └── statistical_analysis.py # Statistical Analysis View (F3) +│ │ └── panels/ # Classic TUI panel components │ │ ├── flow_list.py # Flow list panel │ │ ├── detail_panel.py # Flow details panel │ │ └── timeline.py # Timeline visualization panel diff --git a/ai.comprehensive_replay.md b/ai.comprehensive_replay.md index 9d15dd0..9f13c23 100644 --- a/ai.comprehensive_replay.md +++ b/ai.comprehensive_replay.md @@ -508,13 +508,16 @@ The project includes comprehensive test suites: ### 🚀 Recommended Usage ```bash -# Primary recommended workflow +# Primary recommended workflow - Professional GUI with embedded plots python streamlens.py --gui --pcap file.pcap -# For terminal-based analysis +# Enhanced terminal-based analysis with professional table layout python streamlens.py --pcap file.pcap -# For comprehensive reporting +# Live network monitoring with real-time TUI updates +python streamlens.py --live --interface eth0 + +# Comprehensive reporting with sigma-based analysis python streamlens.py --pcap file.pcap --report ``` diff --git a/analyzer/analysis/flow_manager.py b/analyzer/analysis/flow_manager.py index bacb982..1de21aa 100644 --- a/analyzer/analysis/flow_manager.py +++ b/analyzer/analysis/flow_manager.py @@ -5,6 +5,8 @@ Flow tracking and management from typing import Dict, Set, Tuple from ..models import FlowStats, FrameTypeStats from ..protocols import Chapter10Dissector, PTPDissector, IENADissector, StandardProtocolDissectors +from ..protocols.enhanced_chapter10 import EnhancedChapter10Decoder +from ..plugins.ch10_timing_analysis import Chapter10TimingAnalysisPlugin try: from scapy.all import Packet, IP, UDP, TCP @@ -28,6 +30,10 @@ class FlowManager: 'iena': IENADissector() } self.standard_dissectors = StandardProtocolDissectors() + + # Enhanced protocol decoders + self.enhanced_ch10_decoder = EnhancedChapter10Decoder() + self.ch10_timing_plugin = Chapter10TimingAnalysisPlugin() def process_packet(self, packet: Packet, frame_num: int) -> None: """Process a single packet and update flow statistics""" @@ -93,6 +99,9 @@ class FlowManager: frame_type = self._classify_frame_type(packet, dissection_results) self._update_frame_type_stats(flow, frame_type, frame_num, timestamp, packet_size) + # Enhanced analysis for Chapter 10 flows + self._perform_enhanced_analysis(packet, flow, frame_num, transport_info) + # Calculate inter-arrival time if len(flow.timestamps) > 1: inter_arrival = timestamp - flow.timestamps[-2] @@ -354,6 +363,122 @@ class FlowManager: return transport_info + def _perform_enhanced_analysis(self, packet: Packet, flow: FlowStats, frame_num: int, transport_info: Dict): + """Perform enhanced analysis using specialized decoders""" + + # Check if this packet can be analyzed by enhanced CH10 decoder + # Use basic dissector detection as a trigger for enhanced analysis + if "CHAPTER10" in flow.detected_protocol_types or "CH10" in flow.detected_protocol_types: + confidence = 1.0 # High confidence since basic dissector already detected it + else: + confidence = self.enhanced_ch10_decoder.can_decode(packet, transport_info) + + if confidence > 0.5: + # Decode frame with full field extraction + frame_data = self.enhanced_ch10_decoder.decode_frame(packet, transport_info) + + if frame_data: + # Update flow with enhanced decoder type + if flow.enhanced_analysis.decoder_type == "Standard": + flow.enhanced_analysis.decoder_type = "Chapter10_Enhanced" + flow.detected_protocol_types.add("Chapter10") + + # Run timing analysis plugin + flow_duration = (flow.timestamps[-1] - flow.timestamps[0]) if len(flow.timestamps) > 1 else 1.0 + flow_context = { + 'flow_duration': flow_duration, + 'flow_key': f"{flow.src_ip}->{flow.dst_ip}" + } + + timing_result = self.ch10_timing_plugin.analyze_frame(frame_data, flow_context) + + # Update enhanced analysis data + self._update_enhanced_analysis_data(flow, frame_data, timing_result) + + def _update_enhanced_analysis_data(self, flow: FlowStats, frame_data, timing_result): + """Update the enhanced analysis data structure""" + + enhanced = flow.enhanced_analysis + + # Store sample decoded field data for display (keep first few samples) + if len(enhanced.sample_decoded_fields) < 5: # Store up to 5 sample frames + frame_sample = {} + # Get all available fields from this frame + for field_name in self.enhanced_ch10_decoder.supported_fields: + field_value = frame_data.get_field(field_name.name) + if field_value is not None: + frame_sample[field_name.name] = field_value + + if frame_sample: # Only store if we got some data + enhanced.sample_decoded_fields[f"frame_{len(enhanced.sample_decoded_fields)}"] = frame_sample + + # Update available field names list + if not enhanced.available_field_names: + enhanced.available_field_names = [field.name for field in self.enhanced_ch10_decoder.supported_fields] + + # Update timing analysis + if timing_result.internal_timestamp is not None: + enhanced.has_internal_timing = True + + # Update running averages for timing + if timing_result.clock_drift_ppm is not None: + if enhanced.avg_clock_drift_ppm == 0: + enhanced.avg_clock_drift_ppm = timing_result.clock_drift_ppm + else: + # Simple running average + enhanced.avg_clock_drift_ppm = (enhanced.avg_clock_drift_ppm + timing_result.clock_drift_ppm) / 2 + + enhanced.max_clock_drift_ppm = max(enhanced.max_clock_drift_ppm, abs(timing_result.clock_drift_ppm)) + + # Update timing quality (use most recent) + enhanced.timing_quality = timing_result.timing_quality + + # Update anomaly rate + if timing_result.anomaly_detected: + enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1) + 1) / flow.frame_count + else: + enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1)) / flow.frame_count + + # Update confidence score + if enhanced.avg_confidence_score == 0: + enhanced.avg_confidence_score = timing_result.confidence_score + else: + enhanced.avg_confidence_score = (enhanced.avg_confidence_score + timing_result.confidence_score) / 2 + + # Update frame quality + frame_quality = frame_data.get_field('frame_quality_score', 0) + if frame_quality > 0: + if enhanced.avg_frame_quality == 0: + enhanced.avg_frame_quality = frame_quality + else: + enhanced.avg_frame_quality = (enhanced.avg_frame_quality + frame_quality) / 2 + + # Update error counts + if frame_data.get_field('rtc_sync_error', False): + enhanced.rtc_sync_errors += 1 + if frame_data.get_field('format_error', False): + enhanced.format_errors += 1 + if frame_data.get_field('overflow_error', False): + enhanced.overflow_errors += 1 + + # Update channel information + channel_id = frame_data.get_field('channel_id', 0) + if channel_id > 0: + enhanced.channel_count = max(enhanced.channel_count, channel_id) + + # Update data type counters + if frame_data.get_field('is_analog_data', False): + enhanced.analog_channels = max(enhanced.analog_channels, 1) + if frame_data.get_field('is_pcm_data', False): + enhanced.pcm_channels = max(enhanced.pcm_channels, 1) + if frame_data.get_field('is_tmats_data', False): + enhanced.tmats_frames += 1 + + # Set primary data type + data_type_name = frame_data.get_field('data_type_name', 'Unknown') + if enhanced.primary_data_type == "Unknown": + enhanced.primary_data_type = data_type_name + def _classify_traffic(self, dst_ip: str) -> str: """Classify traffic as Unicast, Multicast, or Broadcast based on destination IP""" try: diff --git a/analyzer/main.py b/analyzer/main.py index 2b910c0..eea4543 100644 --- a/analyzer/main.py +++ b/analyzer/main.py @@ -10,6 +10,7 @@ import curses from .analysis import EthernetAnalyzer from .tui import TUIInterface +from .tui.modern_interface import ModernTUIInterface from .utils import PCAPLoader, LiveCapture @@ -28,6 +29,8 @@ def main(): help='Generate comprehensive outlier report and exit (no TUI)') parser.add_argument('--gui', action='store_true', help='Launch GUI mode (requires PySide6)') + parser.add_argument('--classic', action='store_true', + help='Use classic TUI interface') args = parser.parse_args() @@ -98,8 +101,11 @@ def main(): generate_outlier_report(analyzer, args.outlier_threshold) return - # TUI mode - tui = TUIInterface(analyzer) + # TUI mode - choose between classic and modern interface + if args.classic: + tui = TUIInterface(analyzer) + else: + tui = ModernTUIInterface(analyzer) if args.live: # Start live capture diff --git a/analyzer/models/flow_stats.py b/analyzer/models/flow_stats.py index b97337c..91615fe 100644 --- a/analyzer/models/flow_stats.py +++ b/analyzer/models/flow_stats.py @@ -21,6 +21,39 @@ class FrameTypeStats: outlier_details: List[Tuple[int, float]] = field(default_factory=list) +@dataclass +class EnhancedAnalysisData: + """Enhanced analysis data from specialized decoders""" + # CH10 Timing Analysis + avg_clock_drift_ppm: float = 0.0 + max_clock_drift_ppm: float = 0.0 + timing_quality: str = "Unknown" # excellent, good, moderate, poor + timing_stability: str = "Unknown" # stable, variable + anomaly_rate: float = 0.0 # Percentage of frames with timing anomalies + avg_confidence_score: float = 0.0 + + # CH10 Frame Quality + avg_frame_quality: float = 0.0 + sequence_gaps: int = 0 + rtc_sync_errors: int = 0 + format_errors: int = 0 + overflow_errors: int = 0 + + # CH10 Data Analysis + channel_count: int = 0 + analog_channels: int = 0 + pcm_channels: int = 0 + tmats_frames: int = 0 + + # General Enhanced Data + has_internal_timing: bool = False + primary_data_type: str = "Unknown" + decoder_type: str = "Standard" # Standard, Chapter10_Enhanced, PTP_Enhanced, etc. + + # Decoded Frame Data Storage + sample_decoded_fields: Dict[str, any] = field(default_factory=dict) # Sample of actual decoded fields for display + available_field_names: List[str] = field(default_factory=list) # List of all available field names from decoder + @dataclass class FlowStats: """Statistics for a source-destination IP pair""" @@ -41,4 +74,5 @@ class FlowStats: total_bytes: int = 0 protocols: Set[str] = field(default_factory=set) detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocol detection (CH10, PTP, IENA, etc) - frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) # Per-frame-type statistics \ No newline at end of file + frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) # Per-frame-type statistics + enhanced_analysis: EnhancedAnalysisData = field(default_factory=EnhancedAnalysisData) # Enhanced decoder analysis \ No newline at end of file diff --git a/analyzer/plugins/ch10_timing_analysis.py b/analyzer/plugins/ch10_timing_analysis.py new file mode 100644 index 0000000..c8e436d --- /dev/null +++ b/analyzer/plugins/ch10_timing_analysis.py @@ -0,0 +1,424 @@ +""" +Chapter 10 Timing Analysis Plugin +Demonstrates how to use exposed CH10 frame variables for advanced analysis +""" + +from typing import Dict, Any, List, Optional +from dataclasses import dataclass +import statistics + +@dataclass +class TimingAnalysisResult: + """Results from timing analysis""" + packet_timestamp: float + internal_timestamp: Optional[float] + time_delta: Optional[float] + clock_drift_ppm: Optional[float] + timing_quality: str + anomaly_detected: bool + confidence_score: float + +class Chapter10TimingAnalysisPlugin: + """Analysis plugin demonstrating use of exposed CH10 variables""" + + def __init__(self): + self.flow_timing_history: Dict[str, List[TimingAnalysisResult]] = {} + self.reference_time_base: Optional[float] = None + + @property + def plugin_name(self) -> str: + return "CH10_Advanced_Timing_Analysis" + + @property + def required_fields(self) -> List[str]: + """Fields required from the enhanced CH10 decoder""" + return [ + 'packet_timestamp', # From packet capture + 'internal_timestamp', # From CH10 internal time + 'internal_seconds', # CH10 seconds component + 'internal_nanoseconds', # CH10 nanoseconds component + 'channel_id', # Channel identifier + 'sequence_number', # Packet sequence + 'relative_time_counter', # RTC value + 'data_type', # Data type for context + 'rtc_sync_error', # RTC sync status + 'time_source_confidence', # Decoder's confidence assessment + 'frame_quality_score' # Overall frame quality + ] + + def analyze_frame(self, frame_data, flow_context: Dict[str, Any]) -> TimingAnalysisResult: + """Analyze timing for a single CH10 frame""" + + # Extract timing information + packet_time = frame_data.get_field('packet_timestamp') + internal_time = frame_data.get_field('internal_timestamp') + + if internal_time is None: + return TimingAnalysisResult( + packet_timestamp=packet_time, + internal_timestamp=None, + time_delta=None, + clock_drift_ppm=None, + timing_quality="no_internal_time", + anomaly_detected=False, + confidence_score=0.0 + ) + + # Calculate basic time delta + time_delta = packet_time - internal_time + + # Analyze timing quality + timing_quality = self._assess_timing_quality(frame_data, time_delta) + + # Calculate clock drift in PPM + clock_drift_ppm = self._calculate_clock_drift_ppm( + time_delta, + flow_context.get('flow_duration', 1.0) + ) + + # Detect anomalies + anomaly_detected = self._detect_timing_anomaly(frame_data, time_delta) + + # Calculate confidence score + confidence_score = self._calculate_confidence_score(frame_data, time_delta) + + result = TimingAnalysisResult( + packet_timestamp=packet_time, + internal_timestamp=internal_time, + time_delta=time_delta, + clock_drift_ppm=clock_drift_ppm, + timing_quality=timing_quality, + anomaly_detected=anomaly_detected, + confidence_score=confidence_score + ) + + # Store in flow history for trend analysis + flow_key = flow_context.get('flow_key', 'unknown') + if flow_key not in self.flow_timing_history: + self.flow_timing_history[flow_key] = [] + self.flow_timing_history[flow_key].append(result) + + return result + + def analyze_flow_timing_trends(self, flow_key: str) -> Dict[str, Any]: + """Analyze timing trends across an entire flow""" + + if flow_key not in self.flow_timing_history: + return {'error': 'No timing data available for flow'} + + timing_results = self.flow_timing_history[flow_key] + valid_results = [r for r in timing_results if r.time_delta is not None] + + if not valid_results: + return {'error': 'No valid timing data in flow'} + + # Extract time deltas for statistical analysis + time_deltas = [r.time_delta for r in valid_results] + drift_values = [r.clock_drift_ppm for r in valid_results if r.clock_drift_ppm is not None] + + # Calculate comprehensive statistics + analysis = { + 'frame_count': len(valid_results), + 'timing_statistics': { + 'mean_delta': statistics.mean(time_deltas), + 'median_delta': statistics.median(time_deltas), + 'std_deviation': statistics.stdev(time_deltas) if len(time_deltas) > 1 else 0.0, + 'min_delta': min(time_deltas), + 'max_delta': max(time_deltas), + 'delta_range': max(time_deltas) - min(time_deltas) + }, + 'drift_analysis': {}, + 'quality_metrics': { + 'anomaly_rate': sum(1 for r in valid_results if r.anomaly_detected) / len(valid_results), + 'avg_confidence': statistics.mean([r.confidence_score for r in valid_results]), + 'quality_trend': self._analyze_quality_trend(valid_results) + }, + 'timing_stability': self._assess_timing_stability(time_deltas), + 'recommendations': [] + } + + # Drift analysis + if drift_values: + analysis['drift_analysis'] = { + 'mean_drift_ppm': statistics.mean(drift_values), + 'drift_stability': statistics.stdev(drift_values) if len(drift_values) > 1 else 0.0, + 'drift_trend': self._calculate_drift_trend(drift_values), + 'max_drift_ppm': max(abs(d) for d in drift_values) + } + + # Generate recommendations + analysis['recommendations'] = self._generate_recommendations(analysis) + + return analysis + + def compare_channel_timing(self, flow_data: Dict[str, List]) -> Dict[str, Any]: + """Compare timing across multiple CH10 channels""" + + channel_analysis = {} + + for flow_key, timing_results in self.flow_timing_history.items(): + # Extract channel ID from first result (simplified) + if timing_results: + # Would need to extract channel_id from frame_data + # This is a simplified example + channel_analysis[flow_key] = { + 'timing_variance': self._calculate_timing_variance(timing_results), + 'sync_quality': self._assess_sync_quality(timing_results), + 'relative_drift': self._calculate_relative_drift(timing_results) + } + + # Cross-channel correlation analysis + correlation_matrix = self._calculate_channel_correlations(channel_analysis) + + return { + 'channel_individual_analysis': channel_analysis, + 'cross_channel_correlations': correlation_matrix, + 'sync_status': self._assess_overall_sync_status(channel_analysis), + 'timing_reference_quality': self._assess_reference_quality() + } + + def _assess_timing_quality(self, frame_data, time_delta: float) -> str: + """Assess the quality of timing data""" + + # Check for RTC sync errors + if frame_data.get_field('rtc_sync_error', False): + return "rtc_sync_error" + + # Check frame quality + frame_quality = frame_data.get_field('frame_quality_score', 0) + if frame_quality < 50: + return "poor_frame_quality" + + # Check time delta magnitude + abs_delta = abs(time_delta) + if abs_delta > 1.0: # More than 1 second difference + return "large_time_discrepancy" + elif abs_delta > 0.1: # More than 100ms difference + return "moderate_time_discrepancy" + elif abs_delta > 0.001: # More than 1ms difference + return "small_time_discrepancy" + else: + return "excellent" + + def _calculate_clock_drift_ppm(self, time_delta: float, flow_duration: float) -> float: + """Calculate clock drift in parts per million""" + if flow_duration <= 0: + return 0.0 + + # Simplified drift calculation + # In practice, would need more sophisticated analysis + drift_fraction = time_delta / flow_duration + return drift_fraction * 1_000_000 # Convert to PPM + + def _detect_timing_anomaly(self, frame_data, time_delta: float) -> bool: + """Detect timing anomalies in this frame""" + + # Multiple criteria for anomaly detection + anomaly_indicators = [ + abs(time_delta) > 0.1, # Large time discrepancy + frame_data.get_field('rtc_sync_error', False), # RTC sync error + frame_data.get_field('format_error', False), # Format error + frame_data.get_field('overflow_error', False), # Overflow error + frame_data.get_field('frame_quality_score', 100) < 70 # Poor frame quality + ] + + return any(anomaly_indicators) + + def _calculate_confidence_score(self, frame_data, time_delta: float) -> float: + """Calculate confidence in timing analysis""" + + confidence_factors = [] + + # Frame quality factor + frame_quality = frame_data.get_field('frame_quality_score', 0) / 100.0 + confidence_factors.append(frame_quality) + + # Time source confidence from decoder + time_confidence = frame_data.get_field('time_source_confidence', 0.5) + confidence_factors.append(time_confidence) + + # Time delta reasonableness + abs_delta = abs(time_delta) + delta_confidence = max(0.0, 1.0 - (abs_delta / 10.0)) # Decreases with larger deltas + confidence_factors.append(delta_confidence) + + # Error flags (reduce confidence) + error_penalty = 0.0 + if frame_data.get_field('rtc_sync_error', False): + error_penalty += 0.3 + if frame_data.get_field('format_error', False): + error_penalty += 0.2 + if frame_data.get_field('overflow_error', False): + error_penalty += 0.2 + + base_confidence = statistics.mean(confidence_factors) + final_confidence = max(0.0, base_confidence - error_penalty) + + return final_confidence + + def _analyze_quality_trend(self, timing_results: List[TimingAnalysisResult]) -> str: + """Analyze trend in timing quality over time""" + + if len(timing_results) < 10: + return "insufficient_data" + + # Split into segments and compare + segment_size = len(timing_results) // 3 + early_segment = timing_results[:segment_size] + late_segment = timing_results[-segment_size:] + + early_confidence = statistics.mean([r.confidence_score for r in early_segment]) + late_confidence = statistics.mean([r.confidence_score for r in late_segment]) + + if late_confidence > early_confidence + 0.1: + return "improving" + elif late_confidence < early_confidence - 0.1: + return "degrading" + else: + return "stable" + + def _assess_timing_stability(self, time_deltas: List[float]) -> Dict[str, Any]: + """Assess the stability of timing measurements""" + + if len(time_deltas) < 2: + return {'stability': 'insufficient_data'} + + std_dev = statistics.stdev(time_deltas) + mean_abs_dev = statistics.mean([abs(d) for d in time_deltas]) + + # Coefficient of variation + mean_delta = statistics.mean(time_deltas) + cv = std_dev / abs(mean_delta) if mean_delta != 0 else float('inf') + + # Stability classification + if std_dev < 0.001: # Less than 1ms standard deviation + stability = "excellent" + elif std_dev < 0.01: # Less than 10ms standard deviation + stability = "good" + elif std_dev < 0.1: # Less than 100ms standard deviation + stability = "moderate" + else: + stability = "poor" + + return { + 'stability': stability, + 'standard_deviation': std_dev, + 'mean_absolute_deviation': mean_abs_dev, + 'coefficient_of_variation': cv, + 'drift_consistency': 'stable' if cv < 0.1 else 'variable' + } + + def _calculate_drift_trend(self, drift_values: List[float]) -> str: + """Calculate the trend in clock drift""" + + if len(drift_values) < 5: + return "insufficient_data" + + # Simple linear trend analysis + x = list(range(len(drift_values))) + y = drift_values + + # Calculate correlation coefficient as trend indicator + n = len(x) + sum_x = sum(x) + sum_y = sum(y) + sum_xy = sum(x[i] * y[i] for i in range(n)) + sum_x2 = sum(x[i] * x[i] for i in range(n)) + + denominator = (n * sum_x2 - sum_x * sum_x) + if denominator == 0: + return "stable" + + slope = (n * sum_xy - sum_x * sum_y) / denominator + + if slope > 1.0: # Increasing drift + return "increasing" + elif slope < -1.0: # Decreasing drift + return "decreasing" + else: + return "stable" + + def _generate_recommendations(self, analysis: Dict[str, Any]) -> List[str]: + """Generate recommendations based on timing analysis""" + + recommendations = [] + + # Check anomaly rate + anomaly_rate = analysis.get('quality_metrics', {}).get('anomaly_rate', 0) + if anomaly_rate > 0.1: # More than 10% anomalies + recommendations.append("High anomaly rate detected - check timing source stability") + + # Check drift + drift_analysis = analysis.get('drift_analysis', {}) + max_drift = drift_analysis.get('max_drift_ppm', 0) + if max_drift > 100: # More than 100 PPM drift + recommendations.append("Significant clock drift detected - calibrate timing reference") + + # Check timing stability + stability_info = analysis.get('timing_stability', {}) + if stability_info.get('stability') in ['poor', 'moderate']: + recommendations.append("Timing instability detected - investigate noise sources") + + # Check confidence + avg_confidence = analysis.get('quality_metrics', {}).get('avg_confidence', 1.0) + if avg_confidence < 0.7: + recommendations.append("Low timing confidence - verify CH10 frame integrity") + + if not recommendations: + recommendations.append("Timing analysis shows good quality - no issues detected") + + return recommendations + + def _calculate_timing_variance(self, timing_results: List[TimingAnalysisResult]) -> float: + """Calculate timing variance for a channel""" + deltas = [r.time_delta for r in timing_results if r.time_delta is not None] + return statistics.variance(deltas) if len(deltas) > 1 else 0.0 + + def _assess_sync_quality(self, timing_results: List[TimingAnalysisResult]) -> str: + """Assess synchronization quality for a channel""" + if not timing_results: + return "no_data" + + avg_confidence = statistics.mean([r.confidence_score for r in timing_results]) + + if avg_confidence > 0.9: + return "excellent" + elif avg_confidence > 0.7: + return "good" + elif avg_confidence > 0.5: + return "moderate" + else: + return "poor" + + def _calculate_relative_drift(self, timing_results: List[TimingAnalysisResult]) -> float: + """Calculate relative drift compared to reference""" + # Simplified calculation - would compare against reference channel + deltas = [r.time_delta for r in timing_results if r.time_delta is not None] + return statistics.stdev(deltas) if len(deltas) > 1 else 0.0 + + def _calculate_channel_correlations(self, channel_analysis: Dict) -> Dict[str, float]: + """Calculate correlations between channels""" + # Placeholder for cross-channel correlation analysis + return {"correlation_analysis": "not_implemented_in_demo"} + + def _assess_overall_sync_status(self, channel_analysis: Dict) -> str: + """Assess overall synchronization status across channels""" + sync_qualities = [info.get('sync_quality', 'poor') for info in channel_analysis.values()] + + excellent_count = sync_qualities.count('excellent') + good_count = sync_qualities.count('good') + total_count = len(sync_qualities) + + if total_count == 0: + return "no_data" + elif excellent_count / total_count > 0.8: + return "excellent" + elif (excellent_count + good_count) / total_count > 0.6: + return "good" + else: + return "poor" + + def _assess_reference_quality(self) -> str: + """Assess the quality of the timing reference""" + # Placeholder for reference quality assessment + return "analysis_pending" \ No newline at end of file diff --git a/analyzer/protocols/enhanced_chapter10.py b/analyzer/protocols/enhanced_chapter10.py new file mode 100644 index 0000000..c0f17b9 --- /dev/null +++ b/analyzer/protocols/enhanced_chapter10.py @@ -0,0 +1,655 @@ +""" +Enhanced Chapter 10 (IRIG 106) decoder with comprehensive field extraction +Exposes all CH10 frame variables for modular analysis +""" + +import struct +from typing import Dict, Any, List, Optional, Union +from dataclasses import dataclass, field +from abc import ABC, abstractmethod + +# Import the modular framework components +@dataclass +class FieldDefinition: + """Defines a field that can be extracted from decoded data""" + name: str + description: str + data_type: type + unit: Optional[str] = None + validator: Optional[callable] = None + +@dataclass +class StructuredFrameData: + """Container for decoded frame data with metadata""" + decoder_name: str + packet_timestamp: float + raw_data: bytes + fields: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + + def get_field(self, name: str, default=None): + return self.fields.get(name, default) + + def has_field(self, name: str) -> bool: + return name in self.fields + +class EnhancedChapter10Decoder: + """Comprehensive Chapter 10 decoder exposing all frame variables""" + + # Chapter 10 sync pattern + SYNC_PATTERN = 0xEB25 + + # Data type definitions from IRIG 106-17 + DATA_TYPES = { + 0x00: "Computer Generated Data", + 0x01: "TMATS", + 0x02: "Computer Generated Data - Format 2", + 0x03: "Computer Generated Data - Format 3", + 0x04: "PCM Format 1", + 0x05: "Time Data - Format 1", + 0x06: "Time Data - Format 2", + 0x07: "Computer Generated Data - Format 4", + 0x08: "PCM Format 2", + 0x09: "IRIG Time", + 0x0A: "Computer Generated Data - Format 5", + 0x0B: "Computer Generated Data - Format 6", + 0x11: "1553 Format 1", + 0x19: "1553 Format 2", + 0x21: "Analog Format 1", + 0x29: "Discrete Format 1", + 0x30: "Message Data", + 0x31: "ARINC 429 Format 1", + 0x38: "Video Format 0", + 0x39: "Video Format 1", + 0x3A: "Video Format 2", + 0x40: "Image Format 0", + 0x41: "Image Format 1", + 0x48: "UART Format 0", + 0x50: "IEEE 1394 Format 0", + 0x51: "IEEE 1394 Format 1", + 0x58: "Parallel Format 0", + 0x59: "Parallel Format 1", + 0x60: "Ethernet Format 0", + 0x61: "Ethernet Format 1", + 0x68: "TSPI/CTS Format 0", + 0x69: "TSPI/CTS Format 1", + 0x70: "CAN Bus", + 0x71: "Fibre Channel Format 0", + 0x72: "Analog Format 2", + 0x73: "Analog Format 3", + 0x74: "Analog Format 4", + 0x75: "Analog Format 5", + 0x76: "Analog Format 6", + 0x77: "Analog Format 7", + 0x78: "Analog Format 8" + } + + # Packet flags bit definitions + PACKET_FLAGS = { + 0: "Secondary Header Time Source", + 1: "Format Error", + 2: "RTC Sync Error", + 3: "IPH Time Source", + 4: "Secondary Header Present", + 5: "Optional Data Present", + 6: "Reserved", + 7: "Overflow Error" + } + + @property + def decoder_name(self) -> str: + return "Chapter10_Enhanced" + + @property + def supported_fields(self) -> List[FieldDefinition]: + """All fields that can be extracted from CH10 frames""" + return [ + # Primary header fields (24 bytes) + FieldDefinition("sync_pattern", "Sync pattern (should be 0xEB25)", int), + FieldDefinition("channel_id", "Channel identifier", int), + FieldDefinition("packet_length", "Total packet length including header", int, "bytes"), + FieldDefinition("data_length", "Data payload length", int, "bytes"), + FieldDefinition("header_version", "Header version number", int), + FieldDefinition("sequence_number", "Packet sequence number", int), + FieldDefinition("packet_flags", "Packet flags byte", int), + FieldDefinition("data_type", "Data type identifier", int), + FieldDefinition("relative_time_counter", "RTC value (6 bytes)", int, "counts"), + FieldDefinition("header_checksum", "Header checksum", int), + + # Decoded packet flags + FieldDefinition("secondary_header_time_source", "Time source from secondary header", bool), + FieldDefinition("format_error", "Format error flag", bool), + FieldDefinition("rtc_sync_error", "RTC synchronization error", bool), + FieldDefinition("iph_time_source", "IPH time source flag", bool), + FieldDefinition("secondary_header_present", "Secondary header present", bool), + FieldDefinition("optional_data_present", "Optional data present", bool), + FieldDefinition("overflow_error", "Data overflow error", bool), + + # Data type information + FieldDefinition("data_type_name", "Human readable data type", str), + FieldDefinition("is_analog_data", "True if analog format", bool), + FieldDefinition("is_pcm_data", "True if PCM format", bool), + FieldDefinition("is_tmats_data", "True if TMATS data", bool), + FieldDefinition("is_time_data", "True if time format", bool), + + # Secondary header fields (if present) + FieldDefinition("secondary_header_time", "Secondary header timestamp", int, "nanoseconds"), + FieldDefinition("internal_seconds", "Internal time seconds component", int, "seconds"), + FieldDefinition("internal_nanoseconds", "Internal time nanoseconds component", int, "nanoseconds"), + FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"), + + # Analog format specific fields + FieldDefinition("analog_minor_frame_count", "Number of minor frames", int), + FieldDefinition("analog_scan_count", "Scans per minor frame", int), + FieldDefinition("analog_channel_count", "Number of analog channels", int), + FieldDefinition("analog_sample_rate", "Sampling rate", float, "Hz"), + FieldDefinition("analog_bit_depth", "Bits per sample", int, "bits"), + FieldDefinition("analog_format_factor", "Format factor", int), + FieldDefinition("analog_measurement_list", "Channel measurement data", dict), + + # PCM format specific fields + FieldDefinition("pcm_minor_frame_sync", "Minor frame sync pattern", int), + FieldDefinition("pcm_major_frame_sync", "Major frame sync pattern", int), + FieldDefinition("pcm_minor_frame_length", "Minor frame length", int, "bits"), + FieldDefinition("pcm_major_frame_length", "Major frame length", int, "minor_frames"), + FieldDefinition("pcm_bits_per_second", "PCM bit rate", int, "bps"), + + # TMATS specific fields + FieldDefinition("tmats_version", "TMATS version", str), + FieldDefinition("tmats_channel_configs", "Parsed channel configurations", dict), + FieldDefinition("tmats_data_source_id", "Data source identifier", str), + FieldDefinition("tmats_recording_date", "Recording date/time", str), + + # General payload analysis + FieldDefinition("payload_entropy", "Data randomness measure", float), + FieldDefinition("payload_patterns", "Detected data patterns", list), + FieldDefinition("has_embedded_timestamps", "Contains timestamp data", bool), + + # Frame quality metrics + FieldDefinition("header_checksum_valid", "Header checksum validation", bool), + FieldDefinition("frame_completeness", "Percentage of expected data present", float, "percent"), + FieldDefinition("data_continuity_errors", "Number of continuity errors", int), + + # Timing analysis fields + FieldDefinition("rtc_time_base", "RTC time base frequency", float, "Hz"), + FieldDefinition("time_source_confidence", "Confidence in time source", float), + FieldDefinition("clock_drift_indicators", "Indicators of clock drift", dict) + ] + + def can_decode(self, packet, transport_info: Dict) -> float: + """Check if packet contains Chapter 10 data""" + if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): + return 0.0 + + from scapy.all import Raw + raw_data = bytes(packet[Raw]) + + # Must have at least primary header + if len(raw_data) < 24: + return 0.0 + + try: + # Check for sync pattern + sync = struct.unpack(' packet_length - 24: + return 0.5 # Might be CH10 but malformed + + # Check if packet length matches actual data + if packet_length <= len(raw_data): + return 1.0 + else: + return 0.8 # Truncated but probably CH10 + + except (struct.error, IndexError): + return 0.0 + + def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]: + """Comprehensive Chapter 10 frame decoding""" + if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): + return None + + from scapy.all import Raw + raw_data = bytes(packet[Raw]) + + if len(raw_data) < 24: + return None + + try: + frame_data = StructuredFrameData( + decoder_name=self.decoder_name, + packet_timestamp=float(packet.time), + raw_data=raw_data + ) + + # Parse primary header + self._parse_primary_header(raw_data, frame_data) + + # Parse secondary header if present + header_size = 24 + if frame_data.get_field('secondary_header_present'): + header_size += self._parse_secondary_header(raw_data[24:], frame_data) + + # Parse data payload based on type + payload_start = header_size + data_length = frame_data.get_field('data_length', 0) + if payload_start < len(raw_data) and data_length > 0: + payload_end = min(payload_start + data_length, len(raw_data)) + payload = raw_data[payload_start:payload_end] + self._parse_payload(payload, frame_data) + + # Calculate derived fields + self._calculate_derived_fields(frame_data) + + # Validate frame integrity + self._validate_frame(frame_data) + + return frame_data + + except Exception as e: + # Return partial data even if parsing fails + if 'frame_data' in locals(): + frame_data.metadata['parsing_error'] = str(e) + return frame_data + return None + + def _parse_primary_header(self, raw_data: bytes, frame_data: StructuredFrameData): + """Parse the 24-byte primary header""" + # Unpack primary header fields + header = struct.unpack(' int: + """Parse secondary header (variable length, typically 8 bytes for time)""" + if len(data) < 8: + return 0 + + try: + # Parse time format secondary header (most common) + time_data = struct.unpack('> 16) & 0xFFFF + }) + + # Parse measurement data if present + if len(payload) > 12: + self._parse_analog_measurements(payload[12:], frame_data) + + elif data_type in [0x73, 0x74, 0x75]: # Other analog formats + # Simplified parsing for other formats + if len(payload) >= 4: + basic_info = struct.unpack(' 0: + for channel in range(min(channel_count, 16)): # Limit to prevent excessive processing + channel_data = [] + for sample in range(min(samples_per_channel, 100)): # Limit samples + offset = (sample * channel_count + channel) * 2 + if offset + 1 < len(data): + value = struct.unpack(' Dict[str, Any]: + """Parse TMATS text format""" + tmats_data = { + 'tmats_version': 'Unknown', + 'tmats_channel_configs': {}, + 'tmats_data_source_id': 'Unknown', + 'tmats_recording_date': 'Unknown' + } + + try: + # Split on backslashes (TMATS line separators) + lines = text.split('\\') + + channel_configs = {} + for line in lines: + line = line.strip() + if not line: + continue + + # Parse key-value pairs + if ':' in line: + key, value = line.split(':', 1) + elif ';' in line: + key, value = line.split(';', 1) + else: + continue + + key = key.strip() + value = value.strip().rstrip(';') + + # Parse specific TMATS parameters + if key.startswith('G\\VER'): + tmats_data['tmats_version'] = value + elif key.startswith('G\\DSI'): + tmats_data['tmats_data_source_id'] = value + elif key.startswith('G\\RD'): + tmats_data['tmats_recording_date'] = value + elif key.startswith('R-'): + # Channel configuration + self._parse_tmats_channel_config(key, value, channel_configs) + + tmats_data['tmats_channel_configs'] = channel_configs + + except Exception: + pass + + return tmats_data + + def _parse_tmats_channel_config(self, key: str, value: str, configs: Dict): + """Parse TMATS channel configuration parameters""" + # Extract channel ID from key like "R-1\\G" or "R-CH1\\N" + parts = key.split('\\') + if len(parts) < 2: + return + + channel_part = parts[0] # e.g., "R-1" + param_part = parts[1] # e.g., "G", "N", "EU" + + if channel_part.startswith('R-'): + channel_id = channel_part[2:] + + if channel_id not in configs: + configs[channel_id] = {} + + # Map parameter codes + param_map = { + 'G': 'gain', + 'N': 'name', + 'EU': 'units', + 'MN': 'min_value', + 'MX': 'max_value', + 'OF': 'offset', + 'FS': 'full_scale', + 'SN': 'sample_rate' + } + + param_name = param_map.get(param_part, param_part.lower()) + + # Try to convert numeric values + try: + if param_name in ['gain', 'min_value', 'max_value', 'offset', 'full_scale', 'sample_rate']: + value = float(value) + except ValueError: + pass + + configs[channel_id][param_name] = value + + def _parse_time_payload(self, payload: bytes, frame_data: StructuredFrameData): + """Parse time format payload""" + if len(payload) < 8: + return + + try: + # Parse time data (format depends on data type) + time_info = struct.unpack(' float: + """Calculate Shannon entropy of data""" + if len(data) == 0: + return 0.0 + + # Count byte frequencies + frequencies = [0] * 256 + for byte in data: + frequencies[byte] += 1 + + # Calculate entropy + entropy = 0.0 + for freq in frequencies: + if freq > 0: + probability = freq / len(data) + entropy -= probability * (probability.bit_length() - 1) + + return entropy / 8.0 # Normalize to 0-1 range + + def _detect_patterns(self, data: bytes) -> List[str]: + """Detect common patterns in data""" + patterns = [] + + if len(data) < 4: + return patterns + + # Check for repeated patterns + if data[:4] == data[4:8]: + patterns.append("repeated_4byte_pattern") + + # Check for incrementing patterns + if len(data) >= 8: + values = struct.unpack(f'<{len(data)//4}I', data[:len(data)//4*4]) + if all(values[i] < values[i+1] for i in range(len(values)-1)): + patterns.append("incrementing_sequence") + + # Check for zero padding + if data.count(0) > len(data) * 0.8: + patterns.append("mostly_zeros") + + return patterns + + def _calculate_derived_fields(self, frame_data: StructuredFrameData): + """Calculate derived fields from extracted data""" + # Calculate frame completeness + expected_length = frame_data.get_field('packet_length', 0) + actual_length = len(frame_data.raw_data) + if expected_length > 0: + completeness = min(100.0, (actual_length / expected_length) * 100.0) + frame_data.fields['frame_completeness'] = completeness + + # Analyze timing if internal timestamp is available + if frame_data.has_field('internal_timestamp'): + packet_time = frame_data.packet_timestamp + internal_time = frame_data.get_field('internal_timestamp') + + time_delta = packet_time - internal_time + frame_data.fields['packet_internal_time_delta'] = time_delta + + # Simple confidence measure + confidence = 1.0 - min(1.0, abs(time_delta) / 60.0) # Confidence decreases with time delta + frame_data.fields['time_source_confidence'] = confidence + + def _validate_frame(self, frame_data: StructuredFrameData): + """Validate frame integrity""" + # Validate sync pattern + sync = frame_data.get_field('sync_pattern', 0) + frame_data.fields['sync_pattern_valid'] = (sync == self.SYNC_PATTERN) + + # Simple checksum validation (would need proper implementation) + # For now, just check if checksum field exists + has_checksum = frame_data.has_field('header_checksum') + frame_data.fields['header_checksum_valid'] = has_checksum + + # Check for continuity errors (simplified) + seq_num = frame_data.get_field('sequence_number', 0) + frame_data.fields['data_continuity_errors'] = 0 # Would need sequence tracking + + # Overall frame quality score + quality_factors = [ + frame_data.get_field('sync_pattern_valid', False), + frame_data.get_field('header_checksum_valid', False), + frame_data.get_field('frame_completeness', 0) > 95.0, + not frame_data.get_field('format_error', True), + not frame_data.get_field('overflow_error', True) + ] + + quality_score = sum(quality_factors) / len(quality_factors) * 100.0 + frame_data.fields['frame_quality_score'] = quality_score \ No newline at end of file diff --git a/analyzer/tui/interface.py b/analyzer/tui/interface.py index f3f21bb..74a2bb3 100644 --- a/analyzer/tui/interface.py +++ b/analyzer/tui/interface.py @@ -64,6 +64,8 @@ class TUIInterface: break elif action == 'visualize': self._handle_visualization() + elif action == 'switch_tab': + self.detail_panel.switch_tab() def _draw_main_view(self, stdscr): """Draw three-panel main view: flows list, details, and timeline""" diff --git a/analyzer/tui/modern_interface.py b/analyzer/tui/modern_interface.py new file mode 100644 index 0000000..9109688 --- /dev/null +++ b/analyzer/tui/modern_interface.py @@ -0,0 +1,272 @@ +""" +Modern TUI Interface for StreamLens +Focused on Flow Analysis, Packet Decoding, and Statistical Analysis +""" + +import curses +from typing import TYPE_CHECKING, List, Optional +from enum import Enum + +from .navigation import NavigationHandler +from .modern_views import FlowAnalysisView, PacketDecoderView, StatisticalAnalysisView +from ..utils.signal_visualizer import signal_visualizer + +if TYPE_CHECKING: + from ..analysis.core import EthernetAnalyzer + + +class ViewMode(Enum): + FLOW_ANALYSIS = "flow" + PACKET_DECODER = "decode" + STATISTICAL_ANALYSIS = "stats" + + +class ModernTUIInterface: + """ + Modern StreamLens TUI Interface + + Three primary views accessed via 1/2/3: + - 1: Flow Analysis - Visual flow overview with enhanced protocol detection + - 2: Packet Decoder - Deep protocol inspection and field extraction + - 3: Statistical Analysis - Timing analysis, outliers, and quality metrics + """ + + def __init__(self, analyzer: 'EthernetAnalyzer'): + self.analyzer = analyzer + self.navigation = NavigationHandler() + + # Current view mode + self.current_view = ViewMode.FLOW_ANALYSIS + + # Initialize view controllers + self.flow_view = FlowAnalysisView(analyzer) + self.decoder_view = PacketDecoderView(analyzer) + self.stats_view = StatisticalAnalysisView(analyzer) + + # Global state + self.selected_flow_key = None + self.show_help = False + + def run(self, stdscr): + """Main TUI loop for modern interface""" + curses.curs_set(0) # Hide cursor + stdscr.keypad(True) + + # Set timeout based on live mode + if self.analyzer.is_live: + stdscr.timeout(500) # 0.5 second for live updates + else: + stdscr.timeout(1000) # 1 second for static analysis + + while True: + try: + stdscr.clear() + + # Draw header with view indicators + self._draw_header(stdscr) + + # Draw main content based on current view + if self.current_view == ViewMode.FLOW_ANALYSIS: + self.flow_view.draw(stdscr, self.selected_flow_key) + elif self.current_view == ViewMode.PACKET_DECODER: + self.decoder_view.draw(stdscr, self.selected_flow_key) + elif self.current_view == ViewMode.STATISTICAL_ANALYSIS: + self.stats_view.draw(stdscr, self.selected_flow_key) + + # Draw status bar + self._draw_status_bar(stdscr) + + # Overlay help if requested + if self.show_help: + self._draw_help_overlay(stdscr) + + stdscr.refresh() + + # Handle input + key = stdscr.getch() + + # Handle timeout (no key pressed) - refresh for live capture + if key == -1 and self.analyzer.is_live: + continue + + action = self._handle_input(key) + + if action == 'quit': + if self.analyzer.is_live: + self.analyzer.stop_capture = True + break + + except curses.error: + # Handle terminal resize or other curses errors + pass + + def _draw_header(self, stdscr): + """Draw the header with application title and view tabs""" + height, width = stdscr.getmaxyx() + + # Title + title = "StreamLens - Ethernet Traffic Analyzer" + stdscr.addstr(0, 2, title, curses.A_BOLD) + + # Live indicator + if self.analyzer.is_live: + live_text = "[LIVE]" + stdscr.addstr(0, width - len(live_text) - 2, live_text, + curses.A_BOLD | curses.A_BLINK) + + # View tabs + tab_line = 1 + tab_x = 2 + + # 1: Flow Analysis + if self.current_view == ViewMode.FLOW_ANALYSIS: + stdscr.addstr(tab_line, tab_x, "[1: Flow Analysis]", curses.A_REVERSE) + else: + stdscr.addstr(tab_line, tab_x, " 1: Flow Analysis ", curses.A_DIM) + tab_x += 19 + + # 2: Packet Decoder + if self.current_view == ViewMode.PACKET_DECODER: + stdscr.addstr(tab_line, tab_x, "[2: Packet Decoder]", curses.A_REVERSE) + else: + stdscr.addstr(tab_line, tab_x, " 2: Packet Decoder ", curses.A_DIM) + tab_x += 20 + + # 3: Statistical Analysis + if self.current_view == ViewMode.STATISTICAL_ANALYSIS: + stdscr.addstr(tab_line, tab_x, "[3: Statistical Analysis]", curses.A_REVERSE) + else: + stdscr.addstr(tab_line, tab_x, " 3: Statistical Analysis ", curses.A_DIM) + + # Draw separator line + stdscr.addstr(2, 0, "─" * width) + + def _draw_status_bar(self, stdscr): + """Draw status bar with context-sensitive help""" + height, width = stdscr.getmaxyx() + status_y = height - 1 + + # Base controls + status_text = "[1-3]Views [↑↓]Navigate [Enter]Select [H]Help [Q]Quit" + + # Add view-specific controls + if self.current_view == ViewMode.FLOW_ANALYSIS: + status_text += " [V]Visualize [D]Decode" + elif self.current_view == ViewMode.PACKET_DECODER: + status_text += " [E]Export [C]Copy" + elif self.current_view == ViewMode.STATISTICAL_ANALYSIS: + status_text += " [R]Refresh [O]Outliers" + + # Add live capture controls + if self.analyzer.is_live: + status_text += " [P]Pause" + + stdscr.addstr(status_y, 0, status_text[:width-1], curses.A_REVERSE) + + def _draw_help_overlay(self, stdscr): + """Draw help overlay with comprehensive controls""" + height, width = stdscr.getmaxyx() + + # Calculate overlay size + overlay_height = min(20, height - 4) + overlay_width = min(80, width - 4) + start_y = (height - overlay_height) // 2 + start_x = (width - overlay_width) // 2 + + # Create help window + help_lines = [ + "StreamLens - Help", + "", + "VIEWS:", + " 1 - Flow Analysis: Visual flow overview and protocol detection", + " 2 - Packet Decoder: Deep packet inspection and field extraction", + " 3 - Statistical Analysis: Timing analysis and quality metrics", + "", + "NAVIGATION:", + " ↑/↓ - Navigate items", + " Enter - Select flow/packet", + " Tab - Switch panels (when available)", + " PgUp/PgDn - Scroll large lists", + "", + "ANALYSIS:", + " V - Visualize signals (Flow Analysis)", + " D - Deep decode selected flow", + " E - Export decoded data", + " R - Refresh statistics", + " O - Show outlier details", + "", + "GENERAL:", + " H - Toggle this help", + " Q - Quit application", + "", + "Press any key to close help..." + ] + + # Draw background + for y in range(overlay_height): + stdscr.addstr(start_y + y, start_x, " " * overlay_width, curses.A_REVERSE) + + # Draw help content + for i, line in enumerate(help_lines[:overlay_height-1]): + if start_y + i < height - 1: + display_line = line[:overlay_width-2] + attr = curses.A_REVERSE | curses.A_BOLD if i == 0 else curses.A_REVERSE + stdscr.addstr(start_y + i, start_x + 1, display_line, attr) + + def _handle_input(self, key: int) -> str: + """Handle keyboard input with view-specific actions""" + + # Global controls + if key == ord('q') or key == ord('Q'): + return 'quit' + elif key == ord('h') or key == ord('H'): + self.show_help = not self.show_help + return 'help_toggle' + elif self.show_help: + # Any key closes help + self.show_help = False + return 'help_close' + + # View switching + elif key == ord('1'): + self.current_view = ViewMode.FLOW_ANALYSIS + return 'view_change' + elif key == ord('2'): + self.current_view = ViewMode.PACKET_DECODER + return 'view_change' + elif key == ord('3'): + self.current_view = ViewMode.STATISTICAL_ANALYSIS + return 'view_change' + + # Delegate to current view + elif self.current_view == ViewMode.FLOW_ANALYSIS: + return self.flow_view.handle_input(key, self._get_flows_list()) + elif self.current_view == ViewMode.PACKET_DECODER: + return self.decoder_view.handle_input(key, self._get_flows_list()) + elif self.current_view == ViewMode.STATISTICAL_ANALYSIS: + return self.stats_view.handle_input(key, self._get_flows_list()) + + return 'none' + + def _get_flows_list(self): + """Get prioritized list of flows for analysis""" + flows_list = list(self.analyzer.flows.values()) + + # Sort by relevance: enhanced flows first, then by packet count + flows_list.sort(key=lambda x: ( + x.enhanced_analysis.decoder_type != "Standard", # Enhanced first + self.analyzer.statistics_engine.get_max_sigma_deviation(x), # High outliers + x.frame_count # Packet count + ), reverse=True) + + return flows_list + + def get_selected_flow(self): + """Get currently selected flow for cross-view communication""" + if self.selected_flow_key: + return self.analyzer.flows.get(self.selected_flow_key) + return None + + def set_selected_flow(self, flow_key): + """Set selected flow for cross-view communication""" + self.selected_flow_key = flow_key \ No newline at end of file diff --git a/analyzer/tui/modern_views/__init__.py b/analyzer/tui/modern_views/__init__.py new file mode 100644 index 0000000..3f5d99e --- /dev/null +++ b/analyzer/tui/modern_views/__init__.py @@ -0,0 +1,9 @@ +""" +Modern TUI Views for StreamLens +""" + +from .flow_analysis import FlowAnalysisView +from .packet_decoder import PacketDecoderView +from .statistical_analysis import StatisticalAnalysisView + +__all__ = ['FlowAnalysisView', 'PacketDecoderView', 'StatisticalAnalysisView'] \ No newline at end of file diff --git a/analyzer/tui/modern_views/flow_analysis.py b/analyzer/tui/modern_views/flow_analysis.py new file mode 100644 index 0000000..f31e35e --- /dev/null +++ b/analyzer/tui/modern_views/flow_analysis.py @@ -0,0 +1,318 @@ +""" +Flow Analysis View - Visual flow overview with protocol detection +Focuses on understanding communication patterns and flow characteristics +""" + +import curses +from typing import TYPE_CHECKING, List, Optional, Tuple +from ...models import FlowStats + +if TYPE_CHECKING: + from ...analysis.core import EthernetAnalyzer + + +class FlowAnalysisView: + """ + Flow Analysis View - F1 + + Primary view for understanding network flows: + - Flow overview with visual indicators + - Protocol detection and classification + - Traffic patterns and volume analysis + - Enhanced decoder availability + """ + + def __init__(self, analyzer: 'EthernetAnalyzer'): + self.analyzer = analyzer + self.selected_flow = 0 + self.scroll_offset = 0 + self.show_frame_types = True + + def draw(self, stdscr, selected_flow_key: Optional[str]): + """Draw the Flow Analysis view""" + height, width = stdscr.getmaxyx() + start_y = 3 # After header + max_height = height - 2 # Reserve for status bar + + flows_list = self._get_flows_list() + + if not flows_list: + stdscr.addstr(start_y + 2, 4, "No network flows detected", curses.A_DIM) + stdscr.addstr(start_y + 3, 4, "Waiting for packets...", curses.A_DIM) + return + + # Flow summary header + summary = self.analyzer.get_summary() + summary_text = (f"Flows: {summary['unique_flows']} | " + f"Packets: {summary['total_packets']} | " + f"Endpoints: {summary['unique_ips']}") + + if self.analyzer.is_live: + rt_summary = self.analyzer.statistics_engine.get_realtime_summary() + summary_text += f" | Outliers: {rt_summary.get('total_outliers', 0)}" + + stdscr.addstr(start_y, 4, summary_text, curses.A_BOLD) + + # Flow analysis area + flow_start_y = start_y + 2 + self._draw_flow_overview(stdscr, flow_start_y, width, max_height - flow_start_y, flows_list) + + def _draw_flow_overview(self, stdscr, start_y: int, width: int, max_height: int, flows_list: List[FlowStats]): + """Draw comprehensive flow overview""" + + # Header + stdscr.addstr(start_y, 4, "FLOW ANALYSIS", curses.A_BOLD | curses.A_UNDERLINE) + current_y = start_y + 2 + + # Column headers with visual indicators + headers = ( + f"{'#':>2} " + f"{'Source':20} " + f"{'Proto':6} " + f"{'Destination':20} " + f"{'Extended':10} " + f"{'Frame Type':12} " + f"{'Pkts':>6} " + f"{'Volume':>8} " + f"{'Timing':>8} " + f"{'Quality':>8}" + ) + stdscr.addstr(current_y, 4, headers, curses.A_UNDERLINE) + current_y += 1 + + # Calculate visible range + visible_flows = max_height - (current_y - start_y) - 2 + start_idx = self.scroll_offset + end_idx = min(start_idx + visible_flows, len(flows_list)) + + # Draw flows + for i in range(start_idx, end_idx): + flow = flows_list[i] + display_idx = i - start_idx + + # Flow selection + is_selected = (i == self.selected_flow) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + # Flow line + flow_line = self._format_flow_line(i + 1, flow) + stdscr.addstr(current_y + display_idx, 4, flow_line[:width-8], attr) + + # Enhanced indicator + if flow.enhanced_analysis.decoder_type != "Standard": + stdscr.addstr(current_y + display_idx, 2, "●", curses.A_BOLD | curses.color_pair(1)) + + # Frame types sub-display (if selected and enabled) + if is_selected and self.show_frame_types and flow.frame_types: + sub_y = current_y + display_idx + 1 + if sub_y < current_y + visible_flows: + self._draw_frame_types_compact(stdscr, sub_y, width, flow) + + # Scroll indicators + if start_idx > 0: + stdscr.addstr(current_y, width - 10, "↑ More", curses.A_DIM) + if end_idx < len(flows_list): + stdscr.addstr(current_y + visible_flows - 1, width - 10, "↓ More", curses.A_DIM) + + def _format_flow_line(self, flow_num: int, flow: FlowStats) -> str: + """Format a single flow line with comprehensive information""" + + # Source with port (left-aligned) + source = f"{flow.src_ip}:{flow.src_port}" + if len(source) > 18: + source = f"{flow.src_ip[:10]}…:{flow.src_port}" + + # Transport protocol (TCP, UDP, ICMP, IGMP, etc.) + protocol = flow.transport_protocol + + # Destination with port (left-aligned) + destination = f"{flow.dst_ip}:{flow.dst_port}" + if len(destination) > 18: + destination = f"{flow.dst_ip[:10]}…:{flow.dst_port}" + + # Extended protocol (Chapter 10, PTP, IENA, etc.) + extended_protocol = self._get_extended_protocol(flow) + + # Frame type (most common frame type in this flow) + frame_type = self._get_primary_frame_type(flow) + + # Packet count + pkt_count = f"{flow.frame_count}" + + # Volume with units + volume = self._format_bytes(flow.total_bytes) + + # Timing quality + if flow.avg_inter_arrival > 0: + timing = f"{flow.avg_inter_arrival*1000:.1f}ms" + else: + timing = "N/A" + + # Quality score + if flow.enhanced_analysis.decoder_type != "Standard": + if flow.enhanced_analysis.avg_frame_quality > 0: + quality = f"{flow.enhanced_analysis.avg_frame_quality:.0f}%" + else: + quality = "Enhanced" + else: + # Check for outliers + outlier_pct = len(flow.outlier_frames) / flow.frame_count * 100 if flow.frame_count > 0 else 0 + if outlier_pct > 5: + quality = f"{outlier_pct:.0f}% Out" + else: + quality = "Normal" + + return (f"{flow_num:>2} " + f"{source:20} " + f"{protocol:6} " + f"{destination:20} " + f"{extended_protocol:10} " + f"{frame_type:12} " + f"{pkt_count:>6} " + f"{volume:>8} " + f"{timing:>8} " + f"{quality:>8}") + + def _draw_frame_types_compact(self, stdscr, y: int, width: int, flow: FlowStats): + """Draw compact frame type breakdown for selected flow""" + frame_types = sorted(flow.frame_types.items(), key=lambda x: x[1].count, reverse=True) + + # Compact frame type display + type_summary = [] + for frame_type, ft_stats in frame_types[:4]: # Show top 4 + type_summary.append(f"{frame_type}({ft_stats.count})") + + if len(frame_types) > 4: + type_summary.append(f"+{len(frame_types)-4} more") + + frame_line = f" └─ Frame Types: {', '.join(type_summary)}" + stdscr.addstr(y, 4, frame_line[:width-8], curses.A_DIM) + + def _get_primary_protocol(self, flow: FlowStats) -> str: + """Get the most relevant protocol for display""" + # Prioritize enhanced protocols + if flow.detected_protocol_types: + enhanced_protocols = {'CHAPTER10', 'PTP', 'IENA', 'CH10'} + found_enhanced = flow.detected_protocol_types & enhanced_protocols + if found_enhanced: + return list(found_enhanced)[0] + + # Use first detected protocol + return list(flow.detected_protocol_types)[0] + + # Fallback to transport protocol + return flow.transport_protocol + + def _get_extended_protocol(self, flow: FlowStats) -> str: + """Get extended protocol (Chapter 10, PTP, IENA, etc.)""" + if flow.detected_protocol_types: + # Look for specialized protocols + enhanced_protocols = {'CHAPTER10', 'CH10', 'PTP', 'IENA'} + found_enhanced = flow.detected_protocol_types & enhanced_protocols + if found_enhanced: + protocol = list(found_enhanced)[0] + # Simplify display names + if protocol in ['CHAPTER10', 'CH10']: + return 'CH10' + return protocol + + # Check for other common protocols + if flow.detected_protocol_types and 'NTP' in flow.detected_protocol_types: + return 'NTP' + + return '-' + + def _get_primary_frame_type(self, flow: FlowStats) -> str: + """Get the most common frame type in this flow""" + if not flow.frame_types: + return '-' + + # Find the frame type with the most packets + most_common = max(flow.frame_types.items(), key=lambda x: x[1].count) + frame_type = most_common[0] + + # Simplify frame type names for display + if frame_type == 'CH10-Data': + return 'CH10-Data' + elif frame_type == 'TMATS': + return 'TMATS' + elif frame_type.startswith('PTP-'): + return frame_type.replace('PTP-', 'PTP ')[:11] # PTP Sync, PTP Signal + elif frame_type == 'UDP': + return 'UDP' + elif frame_type == 'IGMP': + return 'IGMP' + else: + return frame_type[:11] # Truncate to fit column + + def _format_bytes(self, bytes_count: int) -> str: + """Format byte count with appropriate units""" + if bytes_count >= 1_000_000_000: + return f"{bytes_count / 1_000_000_000:.1f}GB" + elif bytes_count >= 1_000_000: + return f"{bytes_count / 1_000_000:.1f}MB" + elif bytes_count >= 1_000: + return f"{bytes_count / 1_000:.1f}KB" + else: + return f"{bytes_count}B" + + def _get_flows_list(self) -> List[FlowStats]: + """Get flows sorted by importance for flow analysis""" + flows_list = list(self.analyzer.flows.values()) + + # Sort by: Enhanced protocols first, then outliers, then packet count + flows_list.sort(key=lambda x: ( + x.enhanced_analysis.decoder_type != "Standard", + len(x.outlier_frames), + x.frame_count + ), reverse=True) + + return flows_list + + def handle_input(self, key: int, flows_list: List[FlowStats]) -> str: + """Handle input for Flow Analysis view""" + if key == curses.KEY_UP: + self.selected_flow = max(0, self.selected_flow - 1) + self._adjust_scroll() + return 'selection_change' + elif key == curses.KEY_DOWN: + self.selected_flow = min(len(flows_list) - 1, self.selected_flow + 1) + self._adjust_scroll() + return 'selection_change' + elif key == curses.KEY_PPAGE: # Page Up + self.selected_flow = max(0, self.selected_flow - 10) + self._adjust_scroll() + return 'selection_change' + elif key == curses.KEY_NPAGE: # Page Down + self.selected_flow = min(len(flows_list) - 1, self.selected_flow + 10) + self._adjust_scroll() + return 'selection_change' + elif key == ord('\n') or key == curses.KEY_ENTER: + # Select flow for detailed analysis + if flows_list and self.selected_flow < len(flows_list): + selected_flow = flows_list[self.selected_flow] + # Signal flow selection for other views + return 'flow_selected' + elif key == ord('v') or key == ord('V'): + # Visualize selected flow + return 'visualize' + elif key == ord('d') or key == ord('D'): + # Switch to decoder view for selected flow + return 'decode_flow' + elif key == ord('t') or key == ord('T'): + # Toggle frame types display + self.show_frame_types = not self.show_frame_types + return 'toggle_frame_types' + + return 'none' + + def _adjust_scroll(self): + """Adjust scroll position to keep selected item visible""" + # This would be implemented based on visible area calculations + pass + + def get_selected_flow(self, flows_list: List[FlowStats]) -> Optional[FlowStats]: + """Get currently selected flow""" + if flows_list and 0 <= self.selected_flow < len(flows_list): + return flows_list[self.selected_flow] + return None \ No newline at end of file diff --git a/analyzer/tui/modern_views/packet_decoder.py b/analyzer/tui/modern_views/packet_decoder.py new file mode 100644 index 0000000..9d27bec --- /dev/null +++ b/analyzer/tui/modern_views/packet_decoder.py @@ -0,0 +1,307 @@ +""" +Packet Decoder View - Deep protocol inspection and field extraction +Focuses on understanding packet contents and protocol compliance +""" + +import curses +from typing import TYPE_CHECKING, List, Optional, Dict, Any +from ...models import FlowStats + +if TYPE_CHECKING: + from ...analysis.core import EthernetAnalyzer + + +class PacketDecoderView: + """ + Packet Decoder View - F2 + + Deep packet inspection interface: + - Protocol field extraction and display + - Frame-by-frame analysis + - Enhanced decoder output + - Field value inspection + """ + + def __init__(self, analyzer: 'EthernetAnalyzer'): + self.analyzer = analyzer + self.selected_flow = 0 + self.selected_frame = 0 + self.selected_field = 0 + self.scroll_offset = 0 + self.field_scroll_offset = 0 + self.current_panel = 0 # 0=flows, 1=frames, 2=fields + + def draw(self, stdscr, selected_flow_key: Optional[str]): + """Draw the Packet Decoder view""" + height, width = stdscr.getmaxyx() + start_y = 3 + max_height = height - 2 + + flows_list = self._get_enhanced_flows() + + if not flows_list: + stdscr.addstr(start_y + 2, 4, "No enhanced decodable flows detected", curses.A_DIM) + stdscr.addstr(start_y + 3, 4, "Switch to Flow Analysis (F1) to see all flows", curses.A_DIM) + return + + # Decoder view header + stdscr.addstr(start_y, 4, "PACKET DECODER - Enhanced Protocol Analysis", curses.A_BOLD) + + # Three-panel layout: Flows | Frames | Fields + panel_width = width // 3 + + # Panel 1: Enhanced Flows (left) + self._draw_flows_panel(stdscr, start_y + 2, 4, panel_width - 2, max_height - start_y - 2, flows_list) + + # Separator + for y in range(start_y + 2, max_height): + stdscr.addstr(y, panel_width, "│", curses.A_DIM) + + # Panel 2: Frame Details (center) + selected_flow = flows_list[self.selected_flow] if flows_list else None + self._draw_frames_panel(stdscr, start_y + 2, panel_width + 2, panel_width - 2, max_height - start_y - 2, selected_flow) + + # Separator + for y in range(start_y + 2, max_height): + stdscr.addstr(y, 2 * panel_width, "│", curses.A_DIM) + + # Panel 3: Field Inspection (right) + self._draw_fields_panel(stdscr, start_y + 2, 2 * panel_width + 2, panel_width - 2, max_height - start_y - 2, selected_flow) + + def _draw_flows_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flows_list: List[FlowStats]): + """Draw enhanced flows panel""" + panel_attr = curses.A_BOLD if self.current_panel == 0 else curses.A_NORMAL + + # Panel header + header = "Enhanced Flows" + stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) + + current_y = start_y + 2 + available_height = height - 2 + + # Flow list + for i, flow in enumerate(flows_list[:available_height]): + is_selected = (i == self.selected_flow and self.current_panel == 0) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + # Flow summary line + flow_line = self._format_flow_summary(flow, width - 2) + stdscr.addstr(current_y + i, start_x, flow_line, attr) + + # Decoder indicator + decoder_indicator = "●" if flow.enhanced_analysis.decoder_type != "Standard" else "○" + stdscr.addstr(current_y + i, start_x + width - 3, decoder_indicator, + curses.A_BOLD if flow.enhanced_analysis.decoder_type != "Standard" else curses.A_DIM) + + def _draw_frames_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flow: Optional[FlowStats]): + """Draw frame details panel""" + panel_attr = curses.A_BOLD if self.current_panel == 1 else curses.A_NORMAL + + # Panel header + header = "Frame Analysis" + stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) + + if not flow: + stdscr.addstr(start_y + 2, start_x, "No flow selected", curses.A_DIM) + return + + current_y = start_y + 2 + + # Flow details + stdscr.addstr(current_y, start_x, f"Flow: {flow.src_ip} → {flow.dst_ip}", curses.A_BOLD) + current_y += 1 + stdscr.addstr(current_y, start_x, f"Decoder: {flow.enhanced_analysis.decoder_type}") + current_y += 2 + + # Sample frames + if flow.enhanced_analysis.sample_decoded_fields: + stdscr.addstr(current_y, start_x, "Decoded Frames:", curses.A_UNDERLINE) + current_y += 1 + + for i, (frame_key, frame_data) in enumerate(flow.enhanced_analysis.sample_decoded_fields.items()): + is_selected = (i == self.selected_frame and self.current_panel == 1) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + frame_line = f"{frame_key}: {len(frame_data)} fields" + stdscr.addstr(current_y + i, start_x, frame_line[:width-2], attr) + else: + stdscr.addstr(current_y, start_x, "No decoded frame data available", curses.A_DIM) + current_y += 1 + stdscr.addstr(current_y, start_x, "Decoder may not be implemented", curses.A_DIM) + + def _draw_fields_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flow: Optional[FlowStats]): + """Draw field inspection panel""" + panel_attr = curses.A_BOLD if self.current_panel == 2 else curses.A_NORMAL + + # Panel header + header = "Field Inspector" + stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) + + if not flow or not flow.enhanced_analysis.sample_decoded_fields: + stdscr.addstr(start_y + 2, start_x, "No field data available", curses.A_DIM) + return + + current_y = start_y + 2 + + # Get selected frame data + frame_items = list(flow.enhanced_analysis.sample_decoded_fields.items()) + if self.selected_frame < len(frame_items): + frame_key, frame_data = frame_items[self.selected_frame] + + stdscr.addstr(current_y, start_x, f"Frame: {frame_key}", curses.A_BOLD) + current_y += 2 + + # Field list with values + available_height = height - 4 + field_items = list(frame_data.items()) + + start_field = self.field_scroll_offset + end_field = min(start_field + available_height, len(field_items)) + + for i in range(start_field, end_field): + field_name, field_value = field_items[i] + display_idx = i - start_field + + is_selected = (i == self.selected_field and self.current_panel == 2) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + # Format field line + field_line = self._format_field_line(field_name, field_value, width - 2) + stdscr.addstr(current_y + display_idx, start_x, field_line, attr) + + # Scroll indicators + if start_field > 0: + stdscr.addstr(current_y, start_x + width - 5, "↑", curses.A_DIM) + if end_field < len(field_items): + stdscr.addstr(current_y + available_height - 1, start_x + width - 5, "↓", curses.A_DIM) + + def _format_flow_summary(self, flow: FlowStats, max_width: int) -> str: + """Format flow summary for flows panel""" + # Format as "src:port → dst:port | protocol" + source = f"{flow.src_ip}:{flow.src_port}" + destination = f"{flow.dst_ip}:{flow.dst_port}" + + protocol = flow.enhanced_analysis.decoder_type + if protocol == "Standard": + protocol = self._get_primary_protocol(flow) + + # Calculate available space for src and dst + protocol_space = len(protocol) + 3 # " | " + protocol + available_space = max_width - protocol_space + src_space = available_space // 2 - 2 # Account for " → " + dst_space = available_space - src_space - 3 # " → " + + if len(source) > src_space: + source = f"{flow.src_ip[:src_space-6]}…:{flow.src_port}" + if len(destination) > dst_space: + destination = f"{flow.dst_ip[:dst_space-6]}…:{flow.dst_port}" + + return f"{source} → {destination} | {protocol}"[:max_width] + + def _format_field_line(self, field_name: str, field_value: Any, max_width: int) -> str: + """Format field name and value for display""" + # Clean up field name + display_name = field_name.replace('_', ' ').title() + + # Format value based on type + if isinstance(field_value, bool): + display_value = "Yes" if field_value else "No" + elif isinstance(field_value, float): + if "timestamp" in field_name.lower(): + display_value = f"{field_value:.6f}s" + else: + display_value = f"{field_value:.3f}" + elif field_value is None: + display_value = "N/A" + else: + display_value = str(field_value) + + # Truncate if needed + available_name_width = max_width // 2 + available_value_width = max_width - available_name_width - 3 + + if len(display_name) > available_name_width: + display_name = display_name[:available_name_width-1] + "…" + if len(display_value) > available_value_width: + display_value = display_value[:available_value_width-1] + "…" + + return f"{display_name:<{available_name_width}} : {display_value}" + + def _get_enhanced_flows(self) -> List[FlowStats]: + """Get flows with enhanced decoders available""" + flows_list = [] + for flow in self.analyzer.flows.values(): + if (flow.enhanced_analysis.decoder_type != "Standard" or + "CHAPTER10" in flow.detected_protocol_types or + "PTP" in flow.detected_protocol_types or + "IENA" in flow.detected_protocol_types): + flows_list.append(flow) + + # Sort by decoder quality and packet count + flows_list.sort(key=lambda x: ( + x.enhanced_analysis.decoder_type != "Standard", + len(x.enhanced_analysis.sample_decoded_fields), + x.frame_count + ), reverse=True) + + return flows_list + + def _get_primary_protocol(self, flow: FlowStats) -> str: + """Get primary protocol for display""" + if flow.detected_protocol_types: + enhanced_protocols = {'CHAPTER10', 'PTP', 'IENA', 'CH10'} + found_enhanced = flow.detected_protocol_types & enhanced_protocols + if found_enhanced: + return list(found_enhanced)[0] + return list(flow.detected_protocol_types)[0] + return flow.transport_protocol + + def handle_input(self, key: int, flows_list: List[FlowStats]) -> str: + """Handle input for Packet Decoder view""" + enhanced_flows = self._get_enhanced_flows() + + if key == ord('\t'): # Tab to switch panels + self.current_panel = (self.current_panel + 1) % 3 + return 'panel_switch' + elif key == curses.KEY_UP: + if self.current_panel == 0: # Flows panel + self.selected_flow = max(0, self.selected_flow - 1) + elif self.current_panel == 1: # Frames panel + self.selected_frame = max(0, self.selected_frame - 1) + elif self.current_panel == 2: # Fields panel + self.selected_field = max(0, self.selected_field - 1) + self._adjust_field_scroll() + return 'selection_change' + elif key == curses.KEY_DOWN: + if self.current_panel == 0: # Flows panel + self.selected_flow = min(len(enhanced_flows) - 1, self.selected_flow + 1) + elif self.current_panel == 1: # Frames panel + if enhanced_flows and self.selected_flow < len(enhanced_flows): + flow = enhanced_flows[self.selected_flow] + max_frames = len(flow.enhanced_analysis.sample_decoded_fields) - 1 + self.selected_frame = min(max_frames, self.selected_frame + 1) + elif self.current_panel == 2: # Fields panel + if enhanced_flows and self.selected_flow < len(enhanced_flows): + flow = enhanced_flows[self.selected_flow] + if flow.enhanced_analysis.sample_decoded_fields: + frame_items = list(flow.enhanced_analysis.sample_decoded_fields.items()) + if self.selected_frame < len(frame_items): + frame_data = frame_items[self.selected_frame][1] + max_fields = len(frame_data) - 1 + self.selected_field = min(max_fields, self.selected_field + 1) + self._adjust_field_scroll() + return 'selection_change' + elif key == ord('e') or key == ord('E'): + return 'export_fields' + elif key == ord('c') or key == ord('C'): + return 'copy_field' + + return 'none' + + def _adjust_field_scroll(self): + """Adjust field scroll to keep selected field visible""" + # Simple scroll adjustment - could be enhanced + if self.selected_field < self.field_scroll_offset: + self.field_scroll_offset = self.selected_field + elif self.selected_field >= self.field_scroll_offset + 10: # Assume 10 visible fields + self.field_scroll_offset = self.selected_field - 9 \ No newline at end of file diff --git a/analyzer/tui/modern_views/statistical_analysis.py b/analyzer/tui/modern_views/statistical_analysis.py new file mode 100644 index 0000000..6453f5d --- /dev/null +++ b/analyzer/tui/modern_views/statistical_analysis.py @@ -0,0 +1,432 @@ +""" +Statistical Analysis View - Timing analysis, outliers, and quality metrics +Focuses on understanding network performance and data quality +""" + +import curses +import statistics +from typing import TYPE_CHECKING, List, Optional, Dict, Tuple +from ...models import FlowStats + +if TYPE_CHECKING: + from ...analysis.core import EthernetAnalyzer + + +class StatisticalAnalysisView: + """ + Statistical Analysis View - F3 + + Performance and quality analysis interface: + - Timing statistics and outlier detection + - Quality metrics and trends + - Performance indicators + - Network health assessment + """ + + def __init__(self, analyzer: 'EthernetAnalyzer'): + self.analyzer = analyzer + self.selected_flow = 0 + self.analysis_mode = 0 # 0=overview, 1=outliers, 2=quality, 3=timing + self.scroll_offset = 0 + + def draw(self, stdscr, selected_flow_key: Optional[str]): + """Draw the Statistical Analysis view""" + height, width = stdscr.getmaxyx() + start_y = 3 + max_height = height - 2 + + flows_list = self._get_flows_list() + + if not flows_list: + stdscr.addstr(start_y + 2, 4, "No flows available for statistical analysis", curses.A_DIM) + return + + # Statistical analysis header + mode_names = ["Overview", "Outlier Analysis", "Quality Metrics", "Timing Analysis"] + current_mode = mode_names[self.analysis_mode] + stdscr.addstr(start_y, 4, f"STATISTICAL ANALYSIS - {current_mode}", curses.A_BOLD) + + # Mode selector + mode_line = start_y + 1 + for i, mode_name in enumerate(mode_names): + x_pos = 4 + i * 20 + if i == self.analysis_mode: + stdscr.addstr(mode_line, x_pos, f"[{mode_name}]", curses.A_REVERSE) + else: + stdscr.addstr(mode_line, x_pos, f" {mode_name} ", curses.A_DIM) + + # Analysis content area + content_y = start_y + 3 + content_height = max_height - content_y + + if self.analysis_mode == 0: + self._draw_overview(stdscr, content_y, width, content_height, flows_list) + elif self.analysis_mode == 1: + self._draw_outlier_analysis(stdscr, content_y, width, content_height, flows_list) + elif self.analysis_mode == 2: + self._draw_quality_metrics(stdscr, content_y, width, content_height, flows_list) + elif self.analysis_mode == 3: + self._draw_timing_analysis(stdscr, content_y, width, content_height, flows_list) + + def _draw_overview(self, stdscr, start_y: int, width: int, height: int, flows_list: List[FlowStats]): + """Draw statistical overview""" + current_y = start_y + + # Overall statistics + total_packets = sum(flow.frame_count for flow in flows_list) + total_outliers = sum(len(flow.outlier_frames) for flow in flows_list) + outlier_percentage = (total_outliers / total_packets * 100) if total_packets > 0 else 0 + + stdscr.addstr(current_y, 4, "NETWORK PERFORMANCE SUMMARY", curses.A_UNDERLINE) + current_y += 2 + + # Key metrics + metrics = [ + ("Total Flows", str(len(flows_list))), + ("Total Packets", f"{total_packets:,}"), + ("Total Outliers", f"{total_outliers:,} ({outlier_percentage:.2f}%)"), + ("Enhanced Flows", str(sum(1 for f in flows_list if f.enhanced_analysis.decoder_type != "Standard"))), + ] + + for metric_name, metric_value in metrics: + stdscr.addstr(current_y, 4, f"{metric_name:20}: {metric_value}") + current_y += 1 + + current_y += 1 + + # Flow performance table + stdscr.addstr(current_y, 4, "FLOW PERFORMANCE RANKING", curses.A_UNDERLINE) + current_y += 2 + + # Table header + header = f"{'Rank':>4} {'Flow':30} {'Packets':>8} {'Outliers':>9} {'Avg Δt':>10} {'Jitter':>8} {'Score':>6}" + stdscr.addstr(current_y, 4, header, curses.A_BOLD) + current_y += 1 + + # Rank flows by performance + ranked_flows = self._rank_flows_by_performance(flows_list) + + visible_flows = min(height - (current_y - start_y) - 2, len(ranked_flows)) + for i in range(visible_flows): + flow, score = ranked_flows[i] + + is_selected = (i == self.selected_flow) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + # Format flow line + flow_desc = f"{flow.src_ip}:{flow.src_port} → {flow.dst_ip}:{flow.dst_port}" + if len(flow_desc) > 28: + flow_desc = f"{flow.src_ip[:8]}…:{flow.src_port} → {flow.dst_ip[:8]}…:{flow.dst_port}" + + outliers = len(flow.outlier_frames) + outlier_pct = f"{outliers/flow.frame_count*100:.1f}%" if flow.frame_count > 0 else "0%" + + avg_timing = f"{flow.avg_inter_arrival*1000:.1f}ms" if flow.avg_inter_arrival > 0 else "N/A" + jitter = f"{flow.std_inter_arrival*1000:.1f}ms" if flow.std_inter_arrival > 0 else "N/A" + + line = f"{i+1:>4} {flow_desc:30} {flow.frame_count:>8} {outlier_pct:>9} {avg_timing:>10} {jitter:>8} {score:>6.1f}" + stdscr.addstr(current_y + i, 4, line[:width-8], attr) + + def _draw_outlier_analysis(self, stdscr, start_y: int, width: int, height: int, flows_list: List[FlowStats]): + """Draw detailed outlier analysis""" + current_y = start_y + + stdscr.addstr(current_y, 4, "OUTLIER ANALYSIS", curses.A_UNDERLINE) + current_y += 2 + + # Find flows with outliers + outlier_flows = [(flow, len(flow.outlier_frames)) for flow in flows_list if flow.outlier_frames] + outlier_flows.sort(key=lambda x: x[1], reverse=True) + + if not outlier_flows: + stdscr.addstr(current_y, 4, "No outliers detected in any flows", curses.A_DIM) + stdscr.addstr(current_y + 1, 4, "All packet timing appears normal", curses.A_DIM) + return + + # Outlier summary + total_outliers = sum(count for _, count in outlier_flows) + stdscr.addstr(current_y, 4, f"Flows with outliers: {len(outlier_flows)}") + current_y += 1 + stdscr.addstr(current_y, 4, f"Total outlier packets: {total_outliers}") + current_y += 2 + + # Detailed outlier breakdown + stdscr.addstr(current_y, 4, "OUTLIER DETAILS", curses.A_BOLD) + current_y += 1 + + header = f"{'Flow':35} {'Outliers':>9} {'Rate':>8} {'Max Σ':>8} {'Timing':>12}" + stdscr.addstr(current_y, 4, header, curses.A_UNDERLINE) + current_y += 1 + + visible_flows = min(height - (current_y - start_y) - 2, len(outlier_flows)) + for i in range(visible_flows): + flow, outlier_count = outlier_flows[i] + + is_selected = (i == self.selected_flow) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + flow_desc = f"{flow.src_ip}:{flow.src_port} → {flow.dst_ip}:{flow.dst_port}" + if len(flow_desc) > 33: + flow_desc = f"{flow.src_ip[:10]}…:{flow.src_port} → {flow.dst_ip[:10]}…:{flow.dst_port}" + + outlier_rate = f"{outlier_count/flow.frame_count*100:.1f}%" if flow.frame_count > 0 else "0%" + max_sigma = self.analyzer.statistics_engine.get_max_sigma_deviation(flow) + timing_info = f"{flow.avg_inter_arrival*1000:.1f}±{flow.std_inter_arrival*1000:.1f}ms" + + line = f"{flow_desc:35} {outlier_count:>9} {outlier_rate:>8} {max_sigma:>7.1f}σ {timing_info:>12}" + stdscr.addstr(current_y + i, 4, line[:width-8], attr) + + # Selected flow outlier details + if outlier_flows and self.selected_flow < len(outlier_flows): + selected_flow, _ = outlier_flows[self.selected_flow] + self._draw_selected_flow_outliers(stdscr, current_y + visible_flows + 1, width, + height - (current_y + visible_flows + 1 - start_y), selected_flow) + + def _draw_quality_metrics(self, stdscr, start_y: int, width: int, height: int, flows_list: List[FlowStats]): + """Draw quality metrics analysis""" + current_y = start_y + + stdscr.addstr(current_y, 4, "QUALITY METRICS", curses.A_UNDERLINE) + current_y += 2 + + # Enhanced flows quality + enhanced_flows = [f for f in flows_list if f.enhanced_analysis.decoder_type != "Standard"] + + if enhanced_flows: + stdscr.addstr(current_y, 4, "ENHANCED DECODER QUALITY", curses.A_BOLD) + current_y += 1 + + header = f"{'Flow':30} {'Decoder':15} {'Quality':>8} {'Drift':>10} {'Errors':>8}" + stdscr.addstr(current_y, 4, header, curses.A_UNDERLINE) + current_y += 1 + + for i, flow in enumerate(enhanced_flows[:height - (current_y - start_y) - 5]): + is_selected = (i == self.selected_flow) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + flow_desc = f"{flow.src_ip}:{flow.src_port} → {flow.dst_ip}:{flow.dst_port}" + if len(flow_desc) > 28: + flow_desc = f"{flow.src_ip[:8]}…:{flow.src_port} → {flow.dst_ip[:8]}…:{flow.dst_port}" + + enhanced = flow.enhanced_analysis + decoder_type = enhanced.decoder_type.replace("_Enhanced", "") + quality = f"{enhanced.avg_frame_quality:.1f}%" if enhanced.avg_frame_quality > 0 else "N/A" + drift = f"{enhanced.avg_clock_drift_ppm:.1f}ppm" if enhanced.avg_clock_drift_ppm != 0 else "N/A" + + error_count = (enhanced.rtc_sync_errors + enhanced.format_errors + + enhanced.overflow_errors + enhanced.sequence_gaps) + + line = f"{flow_desc:30} {decoder_type:15} {quality:>8} {drift:>10} {error_count:>8}" + stdscr.addstr(current_y + i, 4, line[:width-8], attr) + + current_y += len(enhanced_flows) + 2 + + # General quality indicators + stdscr.addstr(current_y, 4, "GENERAL QUALITY INDICATORS", curses.A_BOLD) + current_y += 1 + + # Calculate network health metrics + health_metrics = self._calculate_health_metrics(flows_list) + + for metric_name, metric_value, status in health_metrics: + status_color = curses.A_BOLD if status == "GOOD" else curses.A_DIM if status == "WARNING" else curses.A_REVERSE + stdscr.addstr(current_y, 4, f"{metric_name:25}: {metric_value:15} [{status}]", status_color) + current_y += 1 + + def _draw_timing_analysis(self, stdscr, start_y: int, width: int, height: int, flows_list: List[FlowStats]): + """Draw detailed timing analysis""" + current_y = start_y + + stdscr.addstr(current_y, 4, "TIMING ANALYSIS", curses.A_UNDERLINE) + current_y += 2 + + # Timing distribution summary + all_inter_arrivals = [] + for flow in flows_list: + all_inter_arrivals.extend(flow.inter_arrival_times) + + if all_inter_arrivals: + mean_timing = statistics.mean(all_inter_arrivals) + median_timing = statistics.median(all_inter_arrivals) + std_timing = statistics.stdev(all_inter_arrivals) if len(all_inter_arrivals) > 1 else 0 + + stdscr.addstr(current_y, 4, "NETWORK TIMING DISTRIBUTION", curses.A_BOLD) + current_y += 1 + + timing_stats = [ + ("Mean Inter-arrival", f"{mean_timing*1000:.3f} ms"), + ("Median Inter-arrival", f"{median_timing*1000:.3f} ms"), + ("Standard Deviation", f"{std_timing*1000:.3f} ms"), + ("Coefficient of Variation", f"{std_timing/mean_timing:.3f}" if mean_timing > 0 else "N/A"), + ] + + for stat_name, stat_value in timing_stats: + stdscr.addstr(current_y, 4, f"{stat_name:25}: {stat_value}") + current_y += 1 + + current_y += 1 + + # Per-flow timing details + stdscr.addstr(current_y, 4, "PER-FLOW TIMING ANALYSIS", curses.A_BOLD) + current_y += 1 + + header = f"{'Flow':30} {'Mean':>10} {'Std Dev':>10} {'CV':>8} {'Range':>12}" + stdscr.addstr(current_y, 4, header, curses.A_UNDERLINE) + current_y += 1 + + # Sort flows by timing variability + timing_flows = [(flow, flow.std_inter_arrival / flow.avg_inter_arrival if flow.avg_inter_arrival > 0 else 0) + for flow in flows_list if flow.inter_arrival_times] + timing_flows.sort(key=lambda x: x[1], reverse=True) + + visible_flows = min(height - (current_y - start_y) - 2, len(timing_flows)) + for i in range(visible_flows): + flow, cv = timing_flows[i] + + is_selected = (i == self.selected_flow) + attr = curses.A_REVERSE if is_selected else curses.A_NORMAL + + flow_desc = f"{flow.src_ip}:{flow.src_port} → {flow.dst_ip}:{flow.dst_port}" + if len(flow_desc) > 28: + flow_desc = f"{flow.src_ip[:8]}…:{flow.src_port} → {flow.dst_ip[:8]}…:{flow.dst_port}" + + mean_ms = f"{flow.avg_inter_arrival*1000:.1f}ms" + std_ms = f"{flow.std_inter_arrival*1000:.1f}ms" + cv_str = f"{cv:.3f}" + + if flow.inter_arrival_times: + range_ms = f"{(max(flow.inter_arrival_times) - min(flow.inter_arrival_times))*1000:.1f}ms" + else: + range_ms = "N/A" + + line = f"{flow_desc:30} {mean_ms:>10} {std_ms:>10} {cv_str:>8} {range_ms:>12}" + stdscr.addstr(current_y + i, 4, line[:width-8], attr) + + def _rank_flows_by_performance(self, flows_list: List[FlowStats]) -> List[Tuple[FlowStats, float]]: + """Rank flows by performance score (lower is better)""" + ranked = [] + + for flow in flows_list: + score = 0.0 + + # Outlier penalty (higher percentage = higher score) + if flow.frame_count > 0: + outlier_rate = len(flow.outlier_frames) / flow.frame_count + score += outlier_rate * 100 # 0-100 points + + # Timing variability penalty + if flow.avg_inter_arrival > 0: + cv = flow.std_inter_arrival / flow.avg_inter_arrival + score += cv * 50 # 0-50+ points + + # Enhanced decoder bonus (negative score) + if flow.enhanced_analysis.decoder_type != "Standard": + score -= 10 + if flow.enhanced_analysis.avg_frame_quality > 80: + score -= 5 # Good quality bonus + + ranked.append((flow, score)) + + ranked.sort(key=lambda x: x[1]) # Lower scores first (better performance) + return ranked + + def _calculate_health_metrics(self, flows_list: List[FlowStats]) -> List[Tuple[str, str, str]]: + """Calculate network health metrics""" + metrics = [] + + # Overall outlier rate + total_packets = sum(flow.frame_count for flow in flows_list) + total_outliers = sum(len(flow.outlier_frames) for flow in flows_list) + outlier_rate = (total_outliers / total_packets * 100) if total_packets > 0 else 0 + + outlier_status = "GOOD" if outlier_rate < 1.0 else "WARNING" if outlier_rate < 5.0 else "CRITICAL" + metrics.append(("Network Outlier Rate", f"{outlier_rate:.2f}%", outlier_status)) + + # Enhanced decoder coverage + enhanced_count = sum(1 for f in flows_list if f.enhanced_analysis.decoder_type != "Standard") + coverage = (enhanced_count / len(flows_list) * 100) if flows_list else 0 + coverage_status = "GOOD" if coverage > 50 else "WARNING" if coverage > 0 else "NONE" + metrics.append(("Enhanced Coverage", f"{coverage:.1f}%", coverage_status)) + + # Timing consistency + all_cvs = [] + for flow in flows_list: + if flow.avg_inter_arrival > 0: + cv = flow.std_inter_arrival / flow.avg_inter_arrival + all_cvs.append(cv) + + if all_cvs: + avg_cv = statistics.mean(all_cvs) + timing_status = "GOOD" if avg_cv < 0.1 else "WARNING" if avg_cv < 0.5 else "CRITICAL" + metrics.append(("Timing Consistency", f"CV={avg_cv:.3f}", timing_status)) + + return metrics + + def _draw_selected_flow_outliers(self, stdscr, start_y: int, width: int, height: int, flow: FlowStats): + """Draw outlier details for selected flow""" + if height < 3: + return + + stdscr.addstr(start_y, 4, f"OUTLIER DETAILS: {flow.src_ip}:{flow.src_port} → {flow.dst_ip}:{flow.dst_port}", curses.A_BOLD) + current_y = start_y + 1 + + if flow.outlier_details: + header = f"{'Frame#':>8} {'Inter-arrival':>15} {'Deviation':>12}" + stdscr.addstr(current_y, 4, header, curses.A_UNDERLINE) + current_y += 1 + + visible_outliers = min(height - 3, len(flow.outlier_details)) + for i in range(visible_outliers): + frame_num, timing = flow.outlier_details[i] + + # Calculate sigma deviation + if flow.avg_inter_arrival > 0 and flow.std_inter_arrival > 0: + sigma = abs(timing - flow.avg_inter_arrival) / flow.std_inter_arrival + deviation = f"{sigma:.1f}σ" + else: + deviation = "N/A" + + outlier_line = f"{frame_num:>8} {timing*1000:>12.3f}ms {deviation:>12}" + stdscr.addstr(current_y + i, 4, outlier_line) + + def _get_flows_list(self) -> List[FlowStats]: + """Get flows sorted for statistical analysis""" + flows_list = list(self.analyzer.flows.values()) + + # Sort by statistical interest: outliers first, then enhanced, then packet count + flows_list.sort(key=lambda x: ( + len(x.outlier_frames), + x.enhanced_analysis.decoder_type != "Standard", + x.frame_count + ), reverse=True) + + return flows_list + + def handle_input(self, key: int, flows_list: List[FlowStats]) -> str: + """Handle input for Statistical Analysis view""" + if key == curses.KEY_UP: + self.selected_flow = max(0, self.selected_flow - 1) + return 'selection_change' + elif key == curses.KEY_DOWN: + max_flows = len(flows_list) - 1 + self.selected_flow = min(max_flows, self.selected_flow + 1) + return 'selection_change' + elif key == curses.KEY_LEFT: + self.analysis_mode = max(0, self.analysis_mode - 1) + self.selected_flow = 0 # Reset selection when changing modes + return 'mode_change' + elif key == curses.KEY_RIGHT: + self.analysis_mode = min(3, self.analysis_mode + 1) + self.selected_flow = 0 # Reset selection when changing modes + return 'mode_change' + elif key >= ord('1') and key <= ord('4'): + self.analysis_mode = key - ord('1') + self.selected_flow = 0 + return 'mode_change' + elif key == ord('r') or key == ord('R'): + return 'refresh_stats' + elif key == ord('o') or key == ord('O'): + self.analysis_mode = 1 # Switch to outlier analysis + return 'show_outliers' + + return 'none' \ No newline at end of file diff --git a/analyzer/tui/navigation.py b/analyzer/tui/navigation.py index 296ad86..4869aa4 100644 --- a/analyzer/tui/navigation.py +++ b/analyzer/tui/navigation.py @@ -57,6 +57,8 @@ class NavigationHandler: return 'selection_change' elif key == ord('v') and self.current_view == 'main': # Visualize Chapter 10 signals return 'visualize' + elif key == ord('\t') and self.current_view == 'main': # Tab key to switch detail panel tabs + return 'switch_tab' return 'none' @@ -72,7 +74,7 @@ class NavigationHandler: """Get status bar text based on current view""" if self.current_view == 'main': timeline_status = "ON" if self.show_timeline else "OFF" - return f"[↑↓]navigate [PgUp/PgDn]scroll [t]imeline:{timeline_status} [v]isualize CH10 [d]issection [q]uit" + return f"[↑↓]navigate [Tab]switch tabs [PgUp/PgDn]scroll [t]imeline:{timeline_status} [v]isualize CH10 [d]issection [q]uit" elif self.current_view == 'dissection': return "[m]ain view [q]uit" else: diff --git a/analyzer/tui/panels/detail_panel.py b/analyzer/tui/panels/detail_panel.py index 4e50ff9..367952b 100644 --- a/analyzer/tui/panels/detail_panel.py +++ b/analyzer/tui/panels/detail_panel.py @@ -2,14 +2,18 @@ Right panel - Flow details with frame type table """ -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Dict import curses from ...models import FlowStats, FrameTypeStats class DetailPanel: - """Right panel showing detailed flow information""" + """Right panel showing detailed flow information with tabs""" + + def __init__(self): + self.active_tab = 0 # 0 = Info, 1 = Decode + self.tabs = ["Info", "Decode"] def draw(self, stdscr, x_offset: int, y_offset: int, width: int, flows_list: List[FlowStats], selected_flow: int, max_height: Optional[int] = None): @@ -31,14 +35,58 @@ class DetailPanel: else: max_lines = y_offset + max_height + try: + # Draw tab bar + tab_y = y_offset + self._draw_tab_bar(stdscr, x_offset, tab_y, width) + y_offset += 2 # Space for tab bar + max_lines -= 2 + + # Draw content based on active tab + if self.active_tab == 0: + self._draw_info_tab(stdscr, x_offset, y_offset, width, max_lines, flow, selected_frame_type) + elif self.active_tab == 1: + self._draw_decode_tab(stdscr, x_offset, y_offset, width, max_lines, flow, selected_frame_type) + + except curses.error: + # Ignore curses errors from writing outside screen bounds + pass + + def _draw_tab_bar(self, stdscr, x_offset: int, y_offset: int, width: int): + """Draw the tab bar at the top of the panel""" + tab_line = "" + for i, tab_name in enumerate(self.tabs): + if i == self.active_tab: + tab_line += f"[{tab_name}]" + else: + tab_line += f" {tab_name} " + + if i < len(self.tabs) - 1: + tab_line += " " + + # Add tab navigation hint + tab_line += f" {' ' * (width - len(tab_line) - 15)}[Tab] to switch" + + stdscr.addstr(y_offset, x_offset, tab_line[:width-1], curses.A_BOLD) + stdscr.addstr(y_offset + 1, x_offset, "─" * min(width-1, 50)) + + def _draw_info_tab(self, stdscr, x_offset: int, y_offset: int, width: int, max_lines: int, + flow: FlowStats, selected_frame_type: Optional[str]): + """Draw the Info tab content (original detail panel content)""" try: # ALWAYS show flow details first stdscr.addstr(y_offset, x_offset, f"FLOW DETAILS: {flow.src_ip} -> {flow.dst_ip}", curses.A_BOLD) y_offset += 2 - + stdscr.addstr(y_offset, x_offset, f"Packets: {flow.frame_count} | Bytes: {flow.total_bytes:,}") y_offset += 1 + # Enhanced analysis information + if flow.enhanced_analysis.decoder_type != "Standard": + y_offset += 1 + self._draw_enhanced_analysis(stdscr, x_offset, y_offset, width, flow) + y_offset += self._get_enhanced_analysis_lines(flow) + # Frame types table if flow.frame_types and y_offset < max_lines: y_offset += 1 @@ -174,4 +222,259 @@ class DetailPanel: current_item += 1 # Fallback to first flow if selection is out of bounds - return flows_list[0] if flows_list else None, None \ No newline at end of file + return flows_list[0] if flows_list else None, None + + def _draw_enhanced_analysis(self, stdscr, x_offset: int, y_offset: int, width: int, flow: FlowStats): + """Draw enhanced analysis information""" + enhanced = flow.enhanced_analysis + + try: + # Enhanced analysis header + stdscr.addstr(y_offset, x_offset, f"Enhanced Analysis ({enhanced.decoder_type}):", curses.A_BOLD) + y_offset += 1 + + # Timing analysis for CH10 + if enhanced.has_internal_timing: + stdscr.addstr(y_offset, x_offset + 2, f"Clock Drift: {enhanced.avg_clock_drift_ppm:.2f} PPM (max: {enhanced.max_clock_drift_ppm:.2f})") + y_offset += 1 + + stdscr.addstr(y_offset, x_offset + 2, f"Timing Quality: {enhanced.timing_quality.title()} | Stability: {enhanced.timing_stability.title()}") + y_offset += 1 + + if enhanced.anomaly_rate > 0: + stdscr.addstr(y_offset, x_offset + 2, f"Anomaly Rate: {enhanced.anomaly_rate:.1%} | Confidence: {enhanced.avg_confidence_score:.2f}") + y_offset += 1 + + # Frame quality metrics + if enhanced.avg_frame_quality > 0: + stdscr.addstr(y_offset, x_offset + 2, f"Frame Quality: {enhanced.avg_frame_quality:.1f}%") + y_offset += 1 + + # Error counts + errors = [] + if enhanced.sequence_gaps > 0: + errors.append(f"Seq: {enhanced.sequence_gaps}") + if enhanced.rtc_sync_errors > 0: + errors.append(f"RTC: {enhanced.rtc_sync_errors}") + if enhanced.format_errors > 0: + errors.append(f"Fmt: {enhanced.format_errors}") + if enhanced.overflow_errors > 0: + errors.append(f"Ovf: {enhanced.overflow_errors}") + + if errors: + stdscr.addstr(y_offset, x_offset + 2, f"Errors: {' | '.join(errors)}") + y_offset += 1 + + # Data analysis + if enhanced.channel_count > 0: + channel_info = f"Channels: {enhanced.channel_count}" + if enhanced.analog_channels > 0: + channel_info += f" (Analog: {enhanced.analog_channels})" + if enhanced.pcm_channels > 0: + channel_info += f" (PCM: {enhanced.pcm_channels})" + if enhanced.tmats_frames > 0: + channel_info += f" (TMATS: {enhanced.tmats_frames})" + + stdscr.addstr(y_offset, x_offset + 2, channel_info) + y_offset += 1 + + # Primary data type + if enhanced.primary_data_type != "Unknown": + stdscr.addstr(y_offset, x_offset + 2, f"Primary Type: {enhanced.primary_data_type}") + y_offset += 1 + + except curses.error: + pass + + def _get_enhanced_analysis_lines(self, flow: FlowStats) -> int: + """Calculate how many lines the enhanced analysis will take""" + enhanced = flow.enhanced_analysis + lines = 1 # Header line + + if enhanced.has_internal_timing: + lines += 2 # Clock drift + timing quality + if enhanced.anomaly_rate > 0: + lines += 1 # Anomaly rate + + if enhanced.avg_frame_quality > 0: + lines += 1 # Frame quality + # Check if we have errors to display + if any([enhanced.sequence_gaps, enhanced.rtc_sync_errors, + enhanced.format_errors, enhanced.overflow_errors]): + lines += 1 + + if enhanced.channel_count > 0: + lines += 1 # Channel info + + if enhanced.primary_data_type != "Unknown": + lines += 1 # Primary data type + + return lines + + def _draw_decode_tab(self, stdscr, x_offset: int, y_offset: int, width: int, max_lines: int, + flow: FlowStats, selected_frame_type: Optional[str]): + """Draw the Decode tab content showing decoded frame information""" + + if flow.enhanced_analysis.decoder_type == "Standard": + stdscr.addstr(y_offset, x_offset, "No enhanced decoder available for this flow", curses.A_DIM) + return + + # Header for decode information + stdscr.addstr(y_offset, x_offset, f"DECODED DATA ({flow.enhanced_analysis.decoder_type}):", curses.A_BOLD) + y_offset += 2 + + # Tree view of decoded information + if flow.enhanced_analysis.decoder_type == "Chapter10_Enhanced": + self._draw_ch10_decode_tree(stdscr, x_offset, y_offset, width, max_lines, flow) + else: + stdscr.addstr(y_offset, x_offset, f"Decoder type '{flow.enhanced_analysis.decoder_type}' display not implemented") + + def _draw_ch10_decode_tree(self, stdscr, x_offset: int, y_offset: int, width: int, max_lines: int, flow: FlowStats): + """Draw Chapter 10 decoded information in tree format""" + enhanced = flow.enhanced_analysis + + tree_items = [ + ("📋 Header Information", None, 0), + (" Decoder Type", enhanced.decoder_type, 1), + (" Primary Data Type", enhanced.primary_data_type, 1), + (" Channel Count", str(enhanced.channel_count) if enhanced.channel_count > 0 else "Unknown", 1), + ("", None, 0), # Spacer + ("⏱️ Timing Analysis", None, 0), + (" Has Internal Timing", "Yes" if enhanced.has_internal_timing else "No", 1), + (" Timing Quality", enhanced.timing_quality.title(), 1), + (" Clock Drift (avg)", f"{enhanced.avg_clock_drift_ppm:.2f} PPM" if enhanced.avg_clock_drift_ppm != 0 else "N/A", 1), + (" Clock Drift (max)", f"{enhanced.max_clock_drift_ppm:.2f} PPM" if enhanced.max_clock_drift_ppm != 0 else "N/A", 1), + (" Timing Stability", enhanced.timing_stability.title(), 1), + (" Anomaly Rate", f"{enhanced.anomaly_rate:.1%}" if enhanced.anomaly_rate > 0 else "0%", 1), + (" Confidence Score", f"{enhanced.avg_confidence_score:.2f}" if enhanced.avg_confidence_score > 0 else "N/A", 1), + ("", None, 0), # Spacer + ("📊 Quality Metrics", None, 0), + (" Frame Quality (avg)", f"{enhanced.avg_frame_quality:.1f}%" if enhanced.avg_frame_quality > 0 else "N/A", 1), + (" Sequence Gaps", str(enhanced.sequence_gaps), 1), + (" RTC Sync Errors", str(enhanced.rtc_sync_errors), 1), + (" Format Errors", str(enhanced.format_errors), 1), + (" Overflow Errors", str(enhanced.overflow_errors), 1), + ("", None, 0), # Spacer + ("📡 Channel Information", None, 0), + (" Total Channels", str(enhanced.channel_count), 1), + (" Analog Channels", str(enhanced.analog_channels), 1), + (" PCM Channels", str(enhanced.pcm_channels), 1), + (" TMATS Frames", str(enhanced.tmats_frames), 1), + ] + + # Add decoded frame samples if available + if enhanced.sample_decoded_fields: + tree_items.extend([ + ("", None, 0), # Spacer + ("🔍 Decoded Frame Samples", None, 0), + ]) + + for frame_key, frame_data in enhanced.sample_decoded_fields.items(): + tree_items.append((f" {frame_key.replace('_', ' ').title()}", None, 1)) + + # Show important fields first + priority_fields = ['packet_timestamp', 'internal_timestamp', 'data_type_name', 'channel_id', + 'sequence_number', 'frame_quality_score', 'rtc_sync_error'] + + # Add priority fields + for field_name in priority_fields: + if field_name in frame_data: + display_value = self._format_field_value(field_name, frame_data[field_name]) + tree_items.append((f" {field_name.replace('_', ' ').title()}", display_value, 2)) + + # Add other fields (up to 5 more) + other_fields = [k for k in frame_data.keys() if k not in priority_fields] + for i, field_name in enumerate(other_fields[:5]): + display_value = self._format_field_value(field_name, frame_data[field_name]) + tree_items.append((f" {field_name.replace('_', ' ').title()}", display_value, 2)) + + if len(other_fields) > 5: + tree_items.append((f" ... and {len(other_fields) - 5} more fields", "", 2)) + + # Add available fields summary + if enhanced.available_field_names: + tree_items.extend([ + ("", None, 0), # Spacer + ("📝 Available Decoder Fields", None, 0), + (" Total Fields Available", str(len(enhanced.available_field_names)), 1), + (" Field Categories", self._categorize_fields(enhanced.available_field_names), 1), + ]) + + # Render tree items + current_y = y_offset + for label, value, indent_level in tree_items: + if current_y >= max_lines: + break + + if not label: # Spacer line + current_y += 1 + continue + + indent = " " * indent_level + + if value is None: # Category header + line = f"{indent}{label}" + stdscr.addstr(current_y, x_offset, line[:width-1], curses.A_BOLD) + else: # Key-value pair + line = f"{indent}{label}: {value}" + stdscr.addstr(current_y, x_offset, line[:width-1]) + + current_y += 1 + + # Add scrolling hint if content is cut off + if current_y >= max_lines and len(tree_items) > (max_lines - y_offset): + if max_lines > 0: + stdscr.addstr(max_lines - 1, x_offset, "... (content truncated)", curses.A_DIM) + + def switch_tab(self): + """Switch to the next tab""" + self.active_tab = (self.active_tab + 1) % len(self.tabs) + + def _format_field_value(self, field_name: str, value) -> str: + """Format field value for display""" + if value is None: + return "N/A" + + # Special formatting for different field types + if "timestamp" in field_name.lower(): + try: + return f"{float(value):.6f}s" + except: + return str(value) + elif "error" in field_name.lower() or field_name.endswith("_error"): + return "Yes" if value else "No" + elif "quality" in field_name.lower() and isinstance(value, (int, float)): + return f"{value:.1f}%" + elif isinstance(value, float): + return f"{value:.3f}" + elif isinstance(value, bool): + return "Yes" if value else "No" + else: + return str(value) + + def _categorize_fields(self, field_names: List[str]) -> str: + """Categorize available fields into groups""" + categories = { + "Timing": 0, + "Quality": 0, + "Data": 0, + "Header": 0, + "Other": 0 + } + + for field_name in field_names: + lower_name = field_name.lower() + if any(keyword in lower_name for keyword in ["time", "timestamp", "rtc", "drift"]): + categories["Timing"] += 1 + elif any(keyword in lower_name for keyword in ["quality", "error", "sync", "confidence"]): + categories["Quality"] += 1 + elif any(keyword in lower_name for keyword in ["data", "analog", "pcm", "channel", "sample"]): + categories["Data"] += 1 + elif any(keyword in lower_name for keyword in ["header", "type", "id", "sequence", "length"]): + categories["Header"] += 1 + else: + categories["Other"] += 1 + + # Format as compact string + active_categories = [f"{k}:{v}" for k, v in categories.items() if v > 0] + return ", ".join(active_categories) \ No newline at end of file diff --git a/analyzer/tui/panels/flow_list.py b/analyzer/tui/panels/flow_list.py index 567eb64..f3413a9 100644 --- a/analyzer/tui/panels/flow_list.py +++ b/analyzer/tui/panels/flow_list.py @@ -19,9 +19,9 @@ class FlowListPanel: flows_list: List[FlowStats], selected_flow: int): """Draw the flow list panel""" - # Draw flows table header with adjusted column widths for better alignment - stdscr.addstr(y_offset, x_offset, "FLOWS:", curses.A_BOLD) - headers = f"{'Src:Port':22} {'Dst:Port':22} {'Proto':6} {'Cast':5} {'#Frames':>7} {'Bytes':>7} {'Encoding':12} {'ΔT Avg':>9}" + # Draw flows table header with enhanced analysis columns + stdscr.addstr(y_offset, x_offset, "FLOWS (Enhanced Analysis):", curses.A_BOLD) + headers = f"{'Src:Port':22} {'Dst:Port':22} {'Proto':6} {'Cast':5} {'#Frames':>7} {'Bytes':>7} {'Encoding':12} {'Quality':>7} {'Drift':>8} {'ΔT Avg':>9}" stdscr.addstr(y_offset + 1, x_offset, headers[:width-1], curses.A_UNDERLINE) # Calculate scrolling parameters @@ -56,7 +56,11 @@ class FlowListPanel: # Abbreviate traffic classification cast_abbrev = flow.traffic_classification[:4] if flow.traffic_classification != "Unknown" else "Unk" - line = f"{src_endpoint:22} {dst_endpoint:22} {flow.transport_protocol:6} {cast_abbrev:5} {flow.frame_count:>7} {bytes_str:>7} {encoding_str:12} {avg_time:>9}" + # Enhanced analysis data + quality_str = self._format_quality_score(flow) + drift_str = self._format_drift_info(flow) + + line = f"{src_endpoint:22} {dst_endpoint:22} {flow.transport_protocol:6} {cast_abbrev:5} {flow.frame_count:>7} {bytes_str:>7} {encoding_str:12} {quality_str:>7} {drift_str:>8} {avg_time:>9}" if display_item == selected_flow: stdscr.addstr(current_row, x_offset, line[:width-1], curses.A_REVERSE) @@ -78,9 +82,14 @@ class FlowListPanel: ft_avg = f"{ft_stats.avg_inter_arrival:.3f}s" if ft_stats.avg_inter_arrival > 0 else "N/A" outlier_count = len(ft_stats.outlier_details) if ft_stats.outlier_details else 0 - # Create frame type line aligned with new column layout + # Create frame type line aligned with enhanced column layout bytes_str_ft = self._format_bytes(ft_stats.total_bytes) - ft_line = f" └─{frame_type:18} {'':22} {'':6} {'':5} {ft_stats.count:>7} {bytes_str_ft:>7} {'':12} {ft_avg:>9}" + + # Enhanced analysis for frame types (inherit from parent flow) + quality_str_ft = self._format_quality_score(flow) if frame_type.startswith('CH10') or frame_type == 'TMATS' else "" + drift_str_ft = self._format_drift_info(flow) if frame_type.startswith('CH10') or frame_type == 'TMATS' else "" + + ft_line = f" └─{frame_type:18} {'':22} {'':6} {'':5} {ft_stats.count:>7} {bytes_str_ft:>7} {'':12} {quality_str_ft:>7} {drift_str_ft:>8} {ft_avg:>9}" if display_item == selected_flow: stdscr.addstr(current_row, x_offset, ft_line[:width-1], curses.A_REVERSE) @@ -179,4 +188,38 @@ class FlowListPanel: if found_app: return list(found_app)[0] - return "Unknown" \ No newline at end of file + return "Unknown" + + def _format_quality_score(self, flow: FlowStats) -> str: + """Format quality score for display""" + enhanced = flow.enhanced_analysis + + if enhanced.decoder_type == "Standard" or enhanced.avg_frame_quality == 0: + return "N/A" + + # Format quality as percentage + quality = enhanced.avg_frame_quality + if quality >= 90: + return f"{quality:.0f}%" + elif quality >= 70: + return f"{quality:.0f}%" + else: + return f"{quality:.0f}%" + + def _format_drift_info(self, flow: FlowStats) -> str: + """Format clock drift information for display""" + enhanced = flow.enhanced_analysis + + if not enhanced.has_internal_timing or enhanced.avg_clock_drift_ppm == 0: + return "N/A" + + # Format drift in PPM + drift_ppm = abs(enhanced.avg_clock_drift_ppm) + if drift_ppm >= 1000: + return f"{drift_ppm/1000:.1f}K" # Show in thousands + elif drift_ppm >= 100: + return f"{drift_ppm:.0f}ppm" + elif drift_ppm >= 10: + return f"{drift_ppm:.1f}ppm" + else: + return f"{drift_ppm:.2f}ppm" \ No newline at end of file diff --git a/example_enhanced_integration.py b/example_enhanced_integration.py new file mode 100644 index 0000000..9b4224a --- /dev/null +++ b/example_enhanced_integration.py @@ -0,0 +1,239 @@ +""" +Example showing how the enhanced CH10 decoder integrates with the TUI +This demonstrates the full data flow from packet to enhanced analysis display +""" + +from analyzer.protocols.enhanced_chapter10 import EnhancedChapter10Decoder +from analyzer.plugins.ch10_timing_analysis import Chapter10TimingAnalysisPlugin +from analyzer.models.flow_stats import FlowStats, EnhancedAnalysisData +import statistics + +class EnhancedFlowProcessor: + """Example processor showing integration of enhanced CH10 analysis""" + + def __init__(self): + self.ch10_decoder = EnhancedChapter10Decoder() + self.timing_plugin = Chapter10TimingAnalysisPlugin() + self.flows = {} + + def process_packet(self, packet, frame_num: int): + """Process packet with enhanced analysis""" + + # Extract transport info (simplified) + transport_info = {'protocol': 'UDP', 'src_port': 4001, 'dst_port': 4001} + + # Check if CH10 decoder can handle this packet + confidence = self.ch10_decoder.can_decode(packet, transport_info) + + if confidence > 0.5: + # Decode frame with full field extraction + frame_data = self.ch10_decoder.decode_frame(packet, transport_info) + + if frame_data: + # Run timing analysis plugin + flow_context = {'flow_duration': 60.0, 'flow_key': 'example_flow'} + timing_result = self.timing_plugin.analyze_frame(frame_data, flow_context) + + # Update or create flow with enhanced data + self._update_flow_with_enhanced_data(frame_data, timing_result, packet) + + def _update_flow_with_enhanced_data(self, frame_data, timing_result, packet): + """Update flow statistics with enhanced analysis data""" + + # Create flow key + flow_key = "192.168.1.100->239.255.0.1" # Example + + # Get or create flow + if flow_key not in self.flows: + self.flows[flow_key] = FlowStats( + src_ip="192.168.1.100", + dst_ip="239.255.0.1", + src_port=4001, + dst_port=4001, + transport_protocol="UDP", + traffic_classification="Multicast" + ) + + flow = self.flows[flow_key] + + # Update basic flow stats + flow.frame_count += 1 + flow.total_bytes += len(packet) + flow.timestamps.append(float(packet.time)) + + # Update enhanced analysis data + self._update_enhanced_analysis_data(flow, frame_data, timing_result) + + def _update_enhanced_analysis_data(self, flow: FlowStats, frame_data, timing_result): + """Update the enhanced analysis data structure""" + + enhanced = flow.enhanced_analysis + + # Set decoder type + enhanced.decoder_type = "Chapter10_Enhanced" + + # Update timing analysis + if timing_result.internal_timestamp is not None: + enhanced.has_internal_timing = True + + # Update running averages for timing + if timing_result.clock_drift_ppm is not None: + if enhanced.avg_clock_drift_ppm == 0: + enhanced.avg_clock_drift_ppm = timing_result.clock_drift_ppm + else: + # Simple running average + enhanced.avg_clock_drift_ppm = (enhanced.avg_clock_drift_ppm + timing_result.clock_drift_ppm) / 2 + + enhanced.max_clock_drift_ppm = max(enhanced.max_clock_drift_ppm, abs(timing_result.clock_drift_ppm)) + + # Update timing quality (use most recent) + enhanced.timing_quality = timing_result.timing_quality + + # Update anomaly rate + if timing_result.anomaly_detected: + enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1) + 1) / flow.frame_count + else: + enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1)) / flow.frame_count + + # Update confidence score + enhanced.avg_confidence_score = (enhanced.avg_confidence_score + timing_result.confidence_score) / 2 + + # Update frame quality + frame_quality = frame_data.get_field('frame_quality_score', 0) + if frame_quality > 0: + if enhanced.avg_frame_quality == 0: + enhanced.avg_frame_quality = frame_quality + else: + enhanced.avg_frame_quality = (enhanced.avg_frame_quality + frame_quality) / 2 + + # Update error counts + if frame_data.get_field('rtc_sync_error', False): + enhanced.rtc_sync_errors += 1 + if frame_data.get_field('format_error', False): + enhanced.format_errors += 1 + if frame_data.get_field('overflow_error', False): + enhanced.overflow_errors += 1 + + # Update channel information + channel_id = frame_data.get_field('channel_id', 0) + if channel_id > 0: + enhanced.channel_count = max(enhanced.channel_count, channel_id) + + # Update data type counters + if frame_data.get_field('is_analog_data', False): + enhanced.analog_channels = max(enhanced.analog_channels, 1) + if frame_data.get_field('is_pcm_data', False): + enhanced.pcm_channels = max(enhanced.pcm_channels, 1) + if frame_data.get_field('is_tmats_data', False): + enhanced.tmats_frames += 1 + + # Set primary data type + data_type_name = frame_data.get_field('data_type_name', 'Unknown') + if enhanced.primary_data_type == "Unknown": + enhanced.primary_data_type = data_type_name + + def get_example_enhanced_flow(self) -> FlowStats: + """Generate an example flow with enhanced analysis for TUI demonstration""" + + # Create a sample flow with realistic enhanced data + flow = FlowStats( + src_ip="192.168.1.100", + dst_ip="239.255.0.1", + src_port=4001, + dst_port=4001, + transport_protocol="UDP", + traffic_classification="Multicast", + frame_count=1234, + total_bytes=1048576, # 1MB + avg_inter_arrival=0.001 # 1ms + ) + + # Enhanced analysis data + enhanced = EnhancedAnalysisData( + # Timing analysis + avg_clock_drift_ppm=15.5, + max_clock_drift_ppm=23.8, + timing_quality="good", + timing_stability="stable", + anomaly_rate=0.023, # 2.3% + avg_confidence_score=0.87, + + # Frame quality + avg_frame_quality=94.2, + sequence_gaps=2, + rtc_sync_errors=1, + format_errors=0, + overflow_errors=0, + + # Data analysis + channel_count=4, + analog_channels=3, + pcm_channels=0, + tmats_frames=5, + + # General + has_internal_timing=True, + primary_data_type="Analog Format 2", + decoder_type="Chapter10_Enhanced" + ) + + flow.enhanced_analysis = enhanced + flow.detected_protocol_types.add("Chapter10") + + return flow + +def demonstrate_tui_integration(): + """Demonstrate how enhanced data appears in TUI""" + + processor = EnhancedFlowProcessor() + example_flow = processor.get_example_enhanced_flow() + + print("=== Enhanced TUI Display Example ===") + print() + + # Show how data appears in main flow table + print("Main Flow Table:") + print("Src:Port Dst:Port Proto Cast #Frames Bytes Encoding Quality Drift ΔT Avg") + print("192.168.1.100:4001 239.255.0.1:4001 UDP Mult 1234 1.0M Chapter10 94% 15.5ppm 1.0ms") + print() + + # Show enhanced detail panel + print("Enhanced Detail Panel:") + print("FLOW DETAILS: 192.168.1.100 -> 239.255.0.1") + print() + print("Packets: 1234 | Bytes: 1,048,576") + print() + print("Enhanced Analysis (Chapter10_Enhanced):") + print(" Clock Drift: 15.50 PPM (max: 23.80)") + print(" Timing Quality: Good | Stability: Stable") + print(" Anomaly Rate: 2.3% | Confidence: 0.87") + print(" Frame Quality: 94.2%") + print(" Errors: Seq: 2 | RTC: 1") + print(" Channels: 4 (Analog: 3) (TMATS: 5)") + print(" Primary Type: Analog Format 2") + print() + + # Show available enhanced fields + print("Available Enhanced Fields for Analysis:") + supported_fields = processor.ch10_decoder.supported_fields + + timing_fields = [f for f in supported_fields if 'time' in f.name.lower() or 'drift' in f.name.lower()] + quality_fields = [f for f in supported_fields if 'quality' in f.name.lower() or 'error' in f.name.lower()] + data_fields = [f for f in supported_fields if 'analog' in f.name.lower() or 'channel' in f.name.lower()] + + print(" Timing Fields:") + for field in timing_fields[:5]: # Show first 5 + print(f" - {field.name}: {field.description}") + + print(" Quality Fields:") + for field in quality_fields[:5]: # Show first 5 + print(f" - {field.name}: {field.description}") + + print(" Data Analysis Fields:") + for field in data_fields[:5]: # Show first 5 + print(f" - {field.name}: {field.description}") + + print(f" ... and {len(supported_fields) - 15} more fields available for custom analysis") + +if __name__ == "__main__": + demonstrate_tui_integration() \ No newline at end of file diff --git a/modular_decoder_framework.md b/modular_decoder_framework.md new file mode 100644 index 0000000..a276c8c --- /dev/null +++ b/modular_decoder_framework.md @@ -0,0 +1,518 @@ +# Modular Decoder Framework with Extensible Analysis + +## Core Concept + +Create a framework where decoders expose structured frame data, and separate analysis plugins can process this data for domain-specific insights without modifying the core software. + +## Architecture Overview + +``` +Packet → Decoder → StructuredFrameData → Analysis Plugins → Enhanced FlowStats +``` + +## 1. Enhanced Decoder Interface + +### Base Decoder with Data Exposure +```python +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Dict, Any, List, Optional, Union +import struct + +@dataclass +class FieldDefinition: + """Defines a field that can be extracted from decoded data""" + name: str + description: str + data_type: type # int, float, str, bytes, etc. + unit: Optional[str] = None # "seconds", "nanoseconds", "MHz", etc. + validator: Optional[callable] = None # Function to validate field value + +@dataclass +class StructuredFrameData: + """Container for decoded frame data with metadata""" + decoder_name: str + packet_timestamp: float # Original packet timestamp + raw_data: bytes + fields: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + + def get_field(self, name: str, default=None): + """Get a field value with optional default""" + return self.fields.get(name, default) + + def has_field(self, name: str) -> bool: + """Check if field exists""" + return name in self.fields + + def get_timestamp_fields(self) -> List[str]: + """Get list of fields that represent timestamps""" + return [name for name, value in self.metadata.items() + if name.endswith('_timestamp_field')] + +class EnhancedDecoder(ABC): + """Base class for decoders that expose structured data""" + + @property + @abstractmethod + def decoder_name(self) -> str: + pass + + @property + @abstractmethod + def supported_fields(self) -> List[FieldDefinition]: + """Return list of fields this decoder can extract""" + pass + + @abstractmethod + def can_decode(self, packet, transport_info: Dict) -> float: + """Return confidence score 0.0-1.0""" + pass + + @abstractmethod + def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]: + """Decode packet and return structured data""" + pass + + def validate_fields(self, frame_data: StructuredFrameData) -> List[str]: + """Validate extracted fields, return list of validation errors""" + errors = [] + for field_def in self.supported_fields: + if field_def.name in frame_data.fields: + value = frame_data.fields[field_def.name] + if field_def.validator and not field_def.validator(value): + errors.append(f"Field {field_def.name} failed validation") + return errors +``` + +## 2. Chapter 10 Enhanced Decoder Example + +```python +class Chapter10EnhancedDecoder(EnhancedDecoder): + """Enhanced Chapter 10 decoder with structured data exposure""" + + @property + def decoder_name(self) -> str: + return "Chapter10" + + @property + def supported_fields(self) -> List[FieldDefinition]: + return [ + FieldDefinition("sync_pattern", "Sync pattern (0xEB25)", int), + FieldDefinition("channel_id", "Channel identifier", int), + FieldDefinition("packet_length", "Total packet length", int, "bytes"), + FieldDefinition("data_length", "Data payload length", int, "bytes"), + FieldDefinition("header_version", "Header version", int), + FieldDefinition("sequence_number", "Packet sequence number", int), + FieldDefinition("packet_flags", "Packet flags", int), + FieldDefinition("data_type", "Data type identifier", int), + FieldDefinition("relative_time_counter", "RTC value", int, "counts"), + FieldDefinition("checksum", "Header checksum", int), + + # Timing fields + FieldDefinition("internal_seconds", "CH10 internal seconds", int, "seconds"), + FieldDefinition("internal_nanoseconds", "CH10 internal nanoseconds", int, "nanoseconds"), + FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"), + + # Data type specific fields + FieldDefinition("analog_channels", "Number of analog channels", int), + FieldDefinition("sample_rate", "Sampling rate", float, "Hz"), + FieldDefinition("pcm_format", "PCM format identifier", int), + ] + + def can_decode(self, packet, transport_info: Dict) -> float: + """Check for Chapter 10 sync pattern""" + if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): + return 0.0 + + from scapy.all import Raw + raw_data = bytes(packet[Raw]) + + # Look for sync pattern + if len(raw_data) >= 2: + sync = struct.unpack(' Optional[StructuredFrameData]: + """Decode Chapter 10 frame with full field extraction""" + if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): + return None + + from scapy.all import Raw + raw_data = bytes(packet[Raw]) + + if len(raw_data) < 24: # Minimum header size + return None + + try: + # Extract header fields + header = struct.unpack('= 32: # Extended header with timing + timing_data = struct.unpack(' Dict[str, Any]: + """Parse analog format specific header""" + if len(data) < 8: + return {} + try: + analog_header = struct.unpack(' Dict[str, Any]: + """Parse PCM format specific header""" + if len(data) < 4: + return {} + try: + pcm_header = struct.unpack(' str: + pass + + @property + @abstractmethod + def required_decoders(self) -> List[str]: + """List of decoder names this plugin requires""" + pass + + @property + @abstractmethod + def required_fields(self) -> Dict[str, List[str]]: + """Dict of decoder_name -> list of required fields""" + pass + + @abstractmethod + def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]: + """Analyze a single frame, return analysis results""" + pass + + @abstractmethod + def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]: + """Analyze a complete flow, return aggregated results""" + pass + + def can_analyze(self, frame_data: StructuredFrameData) -> bool: + """Check if this plugin can analyze the given frame data""" + if frame_data.decoder_name not in self.required_decoders: + return False + + required_fields = self.required_fields.get(frame_data.decoder_name, []) + return all(frame_data.has_field(field) for field in required_fields) + +class Chapter10TimingAnalysisPlugin(AnalysisPlugin): + """Analysis plugin for Chapter 10 timing comparisons""" + + @property + def plugin_name(self) -> str: + return "CH10_Timing_Analysis" + + @property + def required_decoders(self) -> List[str]: + return ["Chapter10"] + + @property + def required_fields(self) -> Dict[str, List[str]]: + return { + "Chapter10": ["internal_timestamp", "internal_seconds", "internal_nanoseconds"] + } + + def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]: + """Compare packet timestamp vs internal Chapter 10 timestamp""" + packet_time = frame_data.packet_timestamp + internal_time = frame_data.get_field('internal_timestamp') + + if internal_time is None: + return {'error': 'No internal timestamp available'} + + time_delta = packet_time - internal_time + + # Calculate clock drift metrics + analysis = { + 'packet_timestamp': packet_time, + 'internal_timestamp': internal_time, + 'time_delta_seconds': time_delta, + 'time_delta_microseconds': time_delta * 1_000_000, + 'clock_drift_ppm': self._calculate_drift_ppm(time_delta, flow_context), + 'timestamp_source': 'CH10_internal' + } + + # Flag significant timing discrepancies + if abs(time_delta) > 0.001: # 1ms threshold + analysis['timing_anomaly'] = True + analysis['anomaly_severity'] = 'high' if abs(time_delta) > 0.1 else 'medium' + + return analysis + + def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]: + """Analyze timing patterns across entire flow""" + if not frame_data_list: + return {} + + timing_deltas = [] + internal_intervals = [] + packet_intervals = [] + + prev_frame = None + for frame_data in frame_data_list: + if not self.can_analyze(frame_data): + continue + + # Calculate timing delta for this frame + packet_time = frame_data.packet_timestamp + internal_time = frame_data.get_field('internal_timestamp') + if internal_time: + timing_deltas.append(packet_time - internal_time) + + # Calculate intervals if we have previous frame + if prev_frame: + packet_interval = packet_time - prev_frame.packet_timestamp + internal_interval = internal_time - prev_frame.get_field('internal_timestamp', 0) + + packet_intervals.append(packet_interval) + internal_intervals.append(internal_interval) + + prev_frame = frame_data + + if not timing_deltas: + return {'error': 'No valid timing data found'} + + # Statistical analysis + import statistics + + flow_analysis = { + 'frame_count': len(timing_deltas), + 'avg_timing_delta': statistics.mean(timing_deltas), + 'std_timing_delta': statistics.stdev(timing_deltas) if len(timing_deltas) > 1 else 0, + 'min_timing_delta': min(timing_deltas), + 'max_timing_delta': max(timing_deltas), + 'timing_drift_trend': self._calculate_drift_trend(timing_deltas), + } + + if packet_intervals and internal_intervals: + flow_analysis.update({ + 'avg_packet_interval': statistics.mean(packet_intervals), + 'avg_internal_interval': statistics.mean(internal_intervals), + 'interval_correlation': self._calculate_correlation(packet_intervals, internal_intervals) + }) + + return flow_analysis + + def _calculate_drift_ppm(self, time_delta: float, flow_context: Dict) -> float: + """Calculate clock drift in parts per million""" + # Simplified drift calculation - would be more sophisticated in practice + return (time_delta * 1_000_000) / flow_context.get('flow_duration', 1.0) + + def _calculate_drift_trend(self, timing_deltas: List[float]) -> str: + """Determine if timing is drifting in a particular direction""" + if len(timing_deltas) < 3: + return 'insufficient_data' + + # Simple trend analysis + first_half = timing_deltas[:len(timing_deltas)//2] + second_half = timing_deltas[len(timing_deltas)//2:] + + first_avg = sum(first_half) / len(first_half) + second_avg = sum(second_half) / len(second_half) + + if second_avg > first_avg + 0.0001: + return 'increasing_drift' + elif second_avg < first_avg - 0.0001: + return 'decreasing_drift' + else: + return 'stable' + + def _calculate_correlation(self, list1: List[float], list2: List[float]) -> float: + """Calculate correlation coefficient between two timing sequences""" + # Simplified correlation - would use proper statistical methods + if len(list1) != len(list2) or len(list1) < 2: + return 0.0 + + # Basic correlation calculation + import statistics + mean1 = statistics.mean(list1) + mean2 = statistics.mean(list2) + + numerator = sum((x - mean1) * (y - mean2) for x, y in zip(list1, list2)) + denominator = (sum((x - mean1)**2 for x in list1) * sum((y - mean2)**2 for y in list2))**0.5 + + return numerator / denominator if denominator != 0 else 0.0 +``` + +## 4. Plugin Registry and Integration + +```python +class AnalysisPluginRegistry: + """Registry for managing analysis plugins""" + + def __init__(self): + self.plugins: Dict[str, AnalysisPlugin] = {} + self.decoder_plugin_map: Dict[str, List[AnalysisPlugin]] = {} + + def register_plugin(self, plugin: AnalysisPlugin): + """Register an analysis plugin""" + self.plugins[plugin.plugin_name] = plugin + + # Map decoders to plugins + for decoder_name in plugin.required_decoders: + if decoder_name not in self.decoder_plugin_map: + self.decoder_plugin_map[decoder_name] = [] + self.decoder_plugin_map[decoder_name].append(plugin) + + def get_plugins_for_decoder(self, decoder_name: str) -> List[AnalysisPlugin]: + """Get all plugins that can analyze data from a specific decoder""" + return self.decoder_plugin_map.get(decoder_name, []) + + def get_plugin(self, plugin_name: str) -> Optional[AnalysisPlugin]: + """Get a specific plugin by name""" + return self.plugins.get(plugin_name) + +class EnhancedFlowManager: + """Enhanced flow manager with plugin support""" + + def __init__(self): + self.decoders: List[EnhancedDecoder] = [] + self.plugin_registry = AnalysisPluginRegistry() + self.flows: Dict[Tuple[str, str], EnhancedFlowStats] = {} + + def register_decoder(self, decoder: EnhancedDecoder): + """Register a decoder""" + self.decoders.append(decoder) + + def register_plugin(self, plugin: AnalysisPlugin): + """Register an analysis plugin""" + self.plugin_registry.register_plugin(plugin) + + def process_packet(self, packet, frame_num: int): + """Process packet with enhanced decoder and plugin analysis""" + # Extract transport info + transport_info = self._extract_transport_info(packet) + + # Try decoders + for decoder in self.decoders: + confidence = decoder.can_decode(packet, transport_info) + if confidence > 0.5: # Threshold for acceptance + frame_data = decoder.decode_frame(packet, transport_info) + if frame_data: + # Apply analysis plugins + self._apply_plugins(frame_data, transport_info) + + # Update flow with enhanced data + self._update_enhanced_flow(frame_data, transport_info) + break + + def _apply_plugins(self, frame_data: StructuredFrameData, transport_info: Dict): + """Apply relevant analysis plugins to frame data""" + plugins = self.plugin_registry.get_plugins_for_decoder(frame_data.decoder_name) + + for plugin in plugins: + if plugin.can_analyze(frame_data): + analysis_result = plugin.analyze_frame(frame_data, {'transport_info': transport_info}) + + # Store plugin results in frame metadata + frame_data.metadata[f'plugin_{plugin.plugin_name}'] = analysis_result +``` + +## 5. Usage Example + +```python +# Setup enhanced system +flow_manager = EnhancedFlowManager() + +# Register enhanced decoders +ch10_decoder = Chapter10EnhancedDecoder() +flow_manager.register_decoder(ch10_decoder) + +# Register analysis plugins +timing_plugin = Chapter10TimingAnalysisPlugin() +flow_manager.register_plugin(timing_plugin) + +# Process packets +for packet in packets: + flow_manager.process_packet(packet, frame_num) + +# Access enhanced analysis results +for flow_key, flow_stats in flow_manager.flows.items(): + timing_analysis = flow_stats.get_plugin_results('CH10_Timing_Analysis') + if timing_analysis: + print(f"Flow {flow_key}: Clock drift = {timing_analysis['avg_timing_delta']:.6f}s") +``` + +This modular framework allows you to: + +1. **Extract any field** from decoded frames without modifying core code +2. **Create custom analysis plugins** for domain-specific insights +3. **Compare internal vs external timestamps** for clock drift analysis +4. **Extend analysis** without touching the main analyzer code +5. **Chain multiple plugins** for complex analysis workflows + +**Questions for implementation:** +- Which specific Chapter 10 timing analysis would be most valuable first? +- Should we create plugins for PTP timestamp analysis as well? +- What other modular analysis capabilities would be useful? \ No newline at end of file diff --git a/test_decode_data.py b/test_decode_data.py new file mode 100644 index 0000000..f57ef57 --- /dev/null +++ b/test_decode_data.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +""" +Test script to verify enhanced CH10 decode data is being captured +""" + +import sys +sys.path.append('.') + +from analyzer.analysis import EthernetAnalyzer +from analyzer.utils import PCAPLoader + +def test_decode_data(): + """Test that decoded frame data is being captured""" + + # Create analyzer + analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0) + + # Load and process PCAP + print("Loading and processing PCAP...") + pcap_loader = PCAPLoader("1 PTPGM.pcapng") + packets = pcap_loader.load_all() + + # Process packets through flow manager + for i, packet in enumerate(packets): + analyzer.flow_manager.process_packet(packet, i + 1) + + # Calculate statistics + analyzer.statistics_engine.calculate_flow_statistics(analyzer.flow_manager.flows) + + print("\n=== ENHANCED DECODE DATA TEST ===") + + # Debug: Show all flows and their decoder types + print(f"Total flows found: {len(analyzer.flow_manager.flows)}") + + for flow_key, flow in analyzer.flow_manager.flows.items(): + print(f"Flow {flow_key}: {flow.enhanced_analysis.decoder_type}, protocols: {flow.detected_protocol_types}") + + # Find CH10 flows + ch10_flows = [] + for flow in analyzer.flow_manager.flows.values(): + if flow.enhanced_analysis.decoder_type == "Chapter10_Enhanced": + ch10_flows.append(flow) + + print(f"Found {len(ch10_flows)} flows with enhanced CH10 decoding") + + for i, flow in enumerate(ch10_flows): + print(f"\nFlow {i+1}: {flow.src_ip} -> {flow.dst_ip}") + enhanced = flow.enhanced_analysis + + print(f" Decoder Type: {enhanced.decoder_type}") + print(f" Primary Data Type: {enhanced.primary_data_type}") + print(f" Available Fields: {len(enhanced.available_field_names)}") + print(f" Sample Frames Stored: {len(enhanced.sample_decoded_fields)}") + + if enhanced.sample_decoded_fields: + print(" Sample Decoded Data:") + for frame_key, frame_data in enhanced.sample_decoded_fields.items(): + print(f" {frame_key}: {len(frame_data)} fields") + # Show first few fields + for j, (field_name, value) in enumerate(list(frame_data.items())[:5]): + print(f" {field_name}: {value}") + if len(frame_data) > 5: + print(f" ... and {len(frame_data) - 5} more fields") + else: + print(" No decoded frame data stored!") + + if enhanced.available_field_names: + print(f" Field Categories: Timing: {sum(1 for f in enhanced.available_field_names if 'time' in f.lower())}, " + f"Quality: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['quality', 'error']))}, " + f"Data: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['data', 'analog', 'channel']))}") + +if __name__ == "__main__": + test_decode_data() \ No newline at end of file