Files
StreamLens/example_enhanced_integration.py
noisedestroyers 5c2cb1a4ed Modern TUI with Enhanced Protocol Hierarchy Interface
Major Features:
- Complete modern TUI interface with three focused views
- Enhanced multi-column layout: Source | Proto | Destination | Extended | Frame Type | Metrics
- Simplified navigation with 1/2/3 hotkeys instead of F1/F2/F3
- Protocol hierarchy: Transport (TCP/UDP) → Extended (CH10/PTP) → Frame Types
- Classic TUI preserved with --classic flag

Views Implemented:
1. Flow Analysis View: Enhanced multi-column flow overview with protocol detection
2. Packet Decoder View: Three-panel deep inspection (Flows | Frames | Fields)
3. Statistical Analysis View: Four analysis modes with timing and quality metrics

Technical Improvements:
- Left-aligned text columns with IP:port precision
- Transport protocol separation from extended protocols
- Frame type identification (CH10-Data, TMATS, PTP Sync)
- Cross-view communication with persistent flow selection
- Context-sensitive help and status bars
- Comprehensive error handling with console fallback
2025-07-26 22:46:49 -04:00

239 lines
9.4 KiB
Python

"""
Example showing how the enhanced CH10 decoder integrates with the TUI
This demonstrates the full data flow from packet to enhanced analysis display
"""
from analyzer.protocols.enhanced_chapter10 import EnhancedChapter10Decoder
from analyzer.plugins.ch10_timing_analysis import Chapter10TimingAnalysisPlugin
from analyzer.models.flow_stats import FlowStats, EnhancedAnalysisData
import statistics
class EnhancedFlowProcessor:
"""Example processor showing integration of enhanced CH10 analysis"""
def __init__(self):
self.ch10_decoder = EnhancedChapter10Decoder()
self.timing_plugin = Chapter10TimingAnalysisPlugin()
self.flows = {}
def process_packet(self, packet, frame_num: int):
"""Process packet with enhanced analysis"""
# Extract transport info (simplified)
transport_info = {'protocol': 'UDP', 'src_port': 4001, 'dst_port': 4001}
# Check if CH10 decoder can handle this packet
confidence = self.ch10_decoder.can_decode(packet, transport_info)
if confidence > 0.5:
# Decode frame with full field extraction
frame_data = self.ch10_decoder.decode_frame(packet, transport_info)
if frame_data:
# Run timing analysis plugin
flow_context = {'flow_duration': 60.0, 'flow_key': 'example_flow'}
timing_result = self.timing_plugin.analyze_frame(frame_data, flow_context)
# Update or create flow with enhanced data
self._update_flow_with_enhanced_data(frame_data, timing_result, packet)
def _update_flow_with_enhanced_data(self, frame_data, timing_result, packet):
"""Update flow statistics with enhanced analysis data"""
# Create flow key
flow_key = "192.168.1.100->239.255.0.1" # Example
# Get or create flow
if flow_key not in self.flows:
self.flows[flow_key] = FlowStats(
src_ip="192.168.1.100",
dst_ip="239.255.0.1",
src_port=4001,
dst_port=4001,
transport_protocol="UDP",
traffic_classification="Multicast"
)
flow = self.flows[flow_key]
# Update basic flow stats
flow.frame_count += 1
flow.total_bytes += len(packet)
flow.timestamps.append(float(packet.time))
# Update enhanced analysis data
self._update_enhanced_analysis_data(flow, frame_data, timing_result)
def _update_enhanced_analysis_data(self, flow: FlowStats, frame_data, timing_result):
"""Update the enhanced analysis data structure"""
enhanced = flow.enhanced_analysis
# Set decoder type
enhanced.decoder_type = "Chapter10_Enhanced"
# Update timing analysis
if timing_result.internal_timestamp is not None:
enhanced.has_internal_timing = True
# Update running averages for timing
if timing_result.clock_drift_ppm is not None:
if enhanced.avg_clock_drift_ppm == 0:
enhanced.avg_clock_drift_ppm = timing_result.clock_drift_ppm
else:
# Simple running average
enhanced.avg_clock_drift_ppm = (enhanced.avg_clock_drift_ppm + timing_result.clock_drift_ppm) / 2
enhanced.max_clock_drift_ppm = max(enhanced.max_clock_drift_ppm, abs(timing_result.clock_drift_ppm))
# Update timing quality (use most recent)
enhanced.timing_quality = timing_result.timing_quality
# Update anomaly rate
if timing_result.anomaly_detected:
enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1) + 1) / flow.frame_count
else:
enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1)) / flow.frame_count
# Update confidence score
enhanced.avg_confidence_score = (enhanced.avg_confidence_score + timing_result.confidence_score) / 2
# Update frame quality
frame_quality = frame_data.get_field('frame_quality_score', 0)
if frame_quality > 0:
if enhanced.avg_frame_quality == 0:
enhanced.avg_frame_quality = frame_quality
else:
enhanced.avg_frame_quality = (enhanced.avg_frame_quality + frame_quality) / 2
# Update error counts
if frame_data.get_field('rtc_sync_error', False):
enhanced.rtc_sync_errors += 1
if frame_data.get_field('format_error', False):
enhanced.format_errors += 1
if frame_data.get_field('overflow_error', False):
enhanced.overflow_errors += 1
# Update channel information
channel_id = frame_data.get_field('channel_id', 0)
if channel_id > 0:
enhanced.channel_count = max(enhanced.channel_count, channel_id)
# Update data type counters
if frame_data.get_field('is_analog_data', False):
enhanced.analog_channels = max(enhanced.analog_channels, 1)
if frame_data.get_field('is_pcm_data', False):
enhanced.pcm_channels = max(enhanced.pcm_channels, 1)
if frame_data.get_field('is_tmats_data', False):
enhanced.tmats_frames += 1
# Set primary data type
data_type_name = frame_data.get_field('data_type_name', 'Unknown')
if enhanced.primary_data_type == "Unknown":
enhanced.primary_data_type = data_type_name
def get_example_enhanced_flow(self) -> FlowStats:
"""Generate an example flow with enhanced analysis for TUI demonstration"""
# Create a sample flow with realistic enhanced data
flow = FlowStats(
src_ip="192.168.1.100",
dst_ip="239.255.0.1",
src_port=4001,
dst_port=4001,
transport_protocol="UDP",
traffic_classification="Multicast",
frame_count=1234,
total_bytes=1048576, # 1MB
avg_inter_arrival=0.001 # 1ms
)
# Enhanced analysis data
enhanced = EnhancedAnalysisData(
# Timing analysis
avg_clock_drift_ppm=15.5,
max_clock_drift_ppm=23.8,
timing_quality="good",
timing_stability="stable",
anomaly_rate=0.023, # 2.3%
avg_confidence_score=0.87,
# Frame quality
avg_frame_quality=94.2,
sequence_gaps=2,
rtc_sync_errors=1,
format_errors=0,
overflow_errors=0,
# Data analysis
channel_count=4,
analog_channels=3,
pcm_channels=0,
tmats_frames=5,
# General
has_internal_timing=True,
primary_data_type="Analog Format 2",
decoder_type="Chapter10_Enhanced"
)
flow.enhanced_analysis = enhanced
flow.detected_protocol_types.add("Chapter10")
return flow
def demonstrate_tui_integration():
"""Demonstrate how enhanced data appears in TUI"""
processor = EnhancedFlowProcessor()
example_flow = processor.get_example_enhanced_flow()
print("=== Enhanced TUI Display Example ===")
print()
# Show how data appears in main flow table
print("Main Flow Table:")
print("Src:Port Dst:Port Proto Cast #Frames Bytes Encoding Quality Drift ΔT Avg")
print("192.168.1.100:4001 239.255.0.1:4001 UDP Mult 1234 1.0M Chapter10 94% 15.5ppm 1.0ms")
print()
# Show enhanced detail panel
print("Enhanced Detail Panel:")
print("FLOW DETAILS: 192.168.1.100 -> 239.255.0.1")
print()
print("Packets: 1234 | Bytes: 1,048,576")
print()
print("Enhanced Analysis (Chapter10_Enhanced):")
print(" Clock Drift: 15.50 PPM (max: 23.80)")
print(" Timing Quality: Good | Stability: Stable")
print(" Anomaly Rate: 2.3% | Confidence: 0.87")
print(" Frame Quality: 94.2%")
print(" Errors: Seq: 2 | RTC: 1")
print(" Channels: 4 (Analog: 3) (TMATS: 5)")
print(" Primary Type: Analog Format 2")
print()
# Show available enhanced fields
print("Available Enhanced Fields for Analysis:")
supported_fields = processor.ch10_decoder.supported_fields
timing_fields = [f for f in supported_fields if 'time' in f.name.lower() or 'drift' in f.name.lower()]
quality_fields = [f for f in supported_fields if 'quality' in f.name.lower() or 'error' in f.name.lower()]
data_fields = [f for f in supported_fields if 'analog' in f.name.lower() or 'channel' in f.name.lower()]
print(" Timing Fields:")
for field in timing_fields[:5]: # Show first 5
print(f" - {field.name}: {field.description}")
print(" Quality Fields:")
for field in quality_fields[:5]: # Show first 5
print(f" - {field.name}: {field.description}")
print(" Data Analysis Fields:")
for field in data_fields[:5]: # Show first 5
print(f" - {field.name}: {field.description}")
print(f" ... and {len(supported_fields) - 15} more fields available for custom analysis")
if __name__ == "__main__":
demonstrate_tui_integration()