Major Features: - Complete modern TUI interface with three focused views - Enhanced multi-column layout: Source | Proto | Destination | Extended | Frame Type | Metrics - Simplified navigation with 1/2/3 hotkeys instead of F1/F2/F3 - Protocol hierarchy: Transport (TCP/UDP) → Extended (CH10/PTP) → Frame Types - Classic TUI preserved with --classic flag Views Implemented: 1. Flow Analysis View: Enhanced multi-column flow overview with protocol detection 2. Packet Decoder View: Three-panel deep inspection (Flows | Frames | Fields) 3. Statistical Analysis View: Four analysis modes with timing and quality metrics Technical Improvements: - Left-aligned text columns with IP:port precision - Transport protocol separation from extended protocols - Frame type identification (CH10-Data, TMATS, PTP Sync) - Cross-view communication with persistent flow selection - Context-sensitive help and status bars - Comprehensive error handling with console fallback
239 lines
9.4 KiB
Python
239 lines
9.4 KiB
Python
"""
|
|
Example showing how the enhanced CH10 decoder integrates with the TUI
|
|
This demonstrates the full data flow from packet to enhanced analysis display
|
|
"""
|
|
|
|
from analyzer.protocols.enhanced_chapter10 import EnhancedChapter10Decoder
|
|
from analyzer.plugins.ch10_timing_analysis import Chapter10TimingAnalysisPlugin
|
|
from analyzer.models.flow_stats import FlowStats, EnhancedAnalysisData
|
|
import statistics
|
|
|
|
class EnhancedFlowProcessor:
|
|
"""Example processor showing integration of enhanced CH10 analysis"""
|
|
|
|
def __init__(self):
|
|
self.ch10_decoder = EnhancedChapter10Decoder()
|
|
self.timing_plugin = Chapter10TimingAnalysisPlugin()
|
|
self.flows = {}
|
|
|
|
def process_packet(self, packet, frame_num: int):
|
|
"""Process packet with enhanced analysis"""
|
|
|
|
# Extract transport info (simplified)
|
|
transport_info = {'protocol': 'UDP', 'src_port': 4001, 'dst_port': 4001}
|
|
|
|
# Check if CH10 decoder can handle this packet
|
|
confidence = self.ch10_decoder.can_decode(packet, transport_info)
|
|
|
|
if confidence > 0.5:
|
|
# Decode frame with full field extraction
|
|
frame_data = self.ch10_decoder.decode_frame(packet, transport_info)
|
|
|
|
if frame_data:
|
|
# Run timing analysis plugin
|
|
flow_context = {'flow_duration': 60.0, 'flow_key': 'example_flow'}
|
|
timing_result = self.timing_plugin.analyze_frame(frame_data, flow_context)
|
|
|
|
# Update or create flow with enhanced data
|
|
self._update_flow_with_enhanced_data(frame_data, timing_result, packet)
|
|
|
|
def _update_flow_with_enhanced_data(self, frame_data, timing_result, packet):
|
|
"""Update flow statistics with enhanced analysis data"""
|
|
|
|
# Create flow key
|
|
flow_key = "192.168.1.100->239.255.0.1" # Example
|
|
|
|
# Get or create flow
|
|
if flow_key not in self.flows:
|
|
self.flows[flow_key] = FlowStats(
|
|
src_ip="192.168.1.100",
|
|
dst_ip="239.255.0.1",
|
|
src_port=4001,
|
|
dst_port=4001,
|
|
transport_protocol="UDP",
|
|
traffic_classification="Multicast"
|
|
)
|
|
|
|
flow = self.flows[flow_key]
|
|
|
|
# Update basic flow stats
|
|
flow.frame_count += 1
|
|
flow.total_bytes += len(packet)
|
|
flow.timestamps.append(float(packet.time))
|
|
|
|
# Update enhanced analysis data
|
|
self._update_enhanced_analysis_data(flow, frame_data, timing_result)
|
|
|
|
def _update_enhanced_analysis_data(self, flow: FlowStats, frame_data, timing_result):
|
|
"""Update the enhanced analysis data structure"""
|
|
|
|
enhanced = flow.enhanced_analysis
|
|
|
|
# Set decoder type
|
|
enhanced.decoder_type = "Chapter10_Enhanced"
|
|
|
|
# Update timing analysis
|
|
if timing_result.internal_timestamp is not None:
|
|
enhanced.has_internal_timing = True
|
|
|
|
# Update running averages for timing
|
|
if timing_result.clock_drift_ppm is not None:
|
|
if enhanced.avg_clock_drift_ppm == 0:
|
|
enhanced.avg_clock_drift_ppm = timing_result.clock_drift_ppm
|
|
else:
|
|
# Simple running average
|
|
enhanced.avg_clock_drift_ppm = (enhanced.avg_clock_drift_ppm + timing_result.clock_drift_ppm) / 2
|
|
|
|
enhanced.max_clock_drift_ppm = max(enhanced.max_clock_drift_ppm, abs(timing_result.clock_drift_ppm))
|
|
|
|
# Update timing quality (use most recent)
|
|
enhanced.timing_quality = timing_result.timing_quality
|
|
|
|
# Update anomaly rate
|
|
if timing_result.anomaly_detected:
|
|
enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1) + 1) / flow.frame_count
|
|
else:
|
|
enhanced.anomaly_rate = (enhanced.anomaly_rate * (flow.frame_count - 1)) / flow.frame_count
|
|
|
|
# Update confidence score
|
|
enhanced.avg_confidence_score = (enhanced.avg_confidence_score + timing_result.confidence_score) / 2
|
|
|
|
# Update frame quality
|
|
frame_quality = frame_data.get_field('frame_quality_score', 0)
|
|
if frame_quality > 0:
|
|
if enhanced.avg_frame_quality == 0:
|
|
enhanced.avg_frame_quality = frame_quality
|
|
else:
|
|
enhanced.avg_frame_quality = (enhanced.avg_frame_quality + frame_quality) / 2
|
|
|
|
# Update error counts
|
|
if frame_data.get_field('rtc_sync_error', False):
|
|
enhanced.rtc_sync_errors += 1
|
|
if frame_data.get_field('format_error', False):
|
|
enhanced.format_errors += 1
|
|
if frame_data.get_field('overflow_error', False):
|
|
enhanced.overflow_errors += 1
|
|
|
|
# Update channel information
|
|
channel_id = frame_data.get_field('channel_id', 0)
|
|
if channel_id > 0:
|
|
enhanced.channel_count = max(enhanced.channel_count, channel_id)
|
|
|
|
# Update data type counters
|
|
if frame_data.get_field('is_analog_data', False):
|
|
enhanced.analog_channels = max(enhanced.analog_channels, 1)
|
|
if frame_data.get_field('is_pcm_data', False):
|
|
enhanced.pcm_channels = max(enhanced.pcm_channels, 1)
|
|
if frame_data.get_field('is_tmats_data', False):
|
|
enhanced.tmats_frames += 1
|
|
|
|
# Set primary data type
|
|
data_type_name = frame_data.get_field('data_type_name', 'Unknown')
|
|
if enhanced.primary_data_type == "Unknown":
|
|
enhanced.primary_data_type = data_type_name
|
|
|
|
def get_example_enhanced_flow(self) -> FlowStats:
|
|
"""Generate an example flow with enhanced analysis for TUI demonstration"""
|
|
|
|
# Create a sample flow with realistic enhanced data
|
|
flow = FlowStats(
|
|
src_ip="192.168.1.100",
|
|
dst_ip="239.255.0.1",
|
|
src_port=4001,
|
|
dst_port=4001,
|
|
transport_protocol="UDP",
|
|
traffic_classification="Multicast",
|
|
frame_count=1234,
|
|
total_bytes=1048576, # 1MB
|
|
avg_inter_arrival=0.001 # 1ms
|
|
)
|
|
|
|
# Enhanced analysis data
|
|
enhanced = EnhancedAnalysisData(
|
|
# Timing analysis
|
|
avg_clock_drift_ppm=15.5,
|
|
max_clock_drift_ppm=23.8,
|
|
timing_quality="good",
|
|
timing_stability="stable",
|
|
anomaly_rate=0.023, # 2.3%
|
|
avg_confidence_score=0.87,
|
|
|
|
# Frame quality
|
|
avg_frame_quality=94.2,
|
|
sequence_gaps=2,
|
|
rtc_sync_errors=1,
|
|
format_errors=0,
|
|
overflow_errors=0,
|
|
|
|
# Data analysis
|
|
channel_count=4,
|
|
analog_channels=3,
|
|
pcm_channels=0,
|
|
tmats_frames=5,
|
|
|
|
# General
|
|
has_internal_timing=True,
|
|
primary_data_type="Analog Format 2",
|
|
decoder_type="Chapter10_Enhanced"
|
|
)
|
|
|
|
flow.enhanced_analysis = enhanced
|
|
flow.detected_protocol_types.add("Chapter10")
|
|
|
|
return flow
|
|
|
|
def demonstrate_tui_integration():
|
|
"""Demonstrate how enhanced data appears in TUI"""
|
|
|
|
processor = EnhancedFlowProcessor()
|
|
example_flow = processor.get_example_enhanced_flow()
|
|
|
|
print("=== Enhanced TUI Display Example ===")
|
|
print()
|
|
|
|
# Show how data appears in main flow table
|
|
print("Main Flow Table:")
|
|
print("Src:Port Dst:Port Proto Cast #Frames Bytes Encoding Quality Drift ΔT Avg")
|
|
print("192.168.1.100:4001 239.255.0.1:4001 UDP Mult 1234 1.0M Chapter10 94% 15.5ppm 1.0ms")
|
|
print()
|
|
|
|
# Show enhanced detail panel
|
|
print("Enhanced Detail Panel:")
|
|
print("FLOW DETAILS: 192.168.1.100 -> 239.255.0.1")
|
|
print()
|
|
print("Packets: 1234 | Bytes: 1,048,576")
|
|
print()
|
|
print("Enhanced Analysis (Chapter10_Enhanced):")
|
|
print(" Clock Drift: 15.50 PPM (max: 23.80)")
|
|
print(" Timing Quality: Good | Stability: Stable")
|
|
print(" Anomaly Rate: 2.3% | Confidence: 0.87")
|
|
print(" Frame Quality: 94.2%")
|
|
print(" Errors: Seq: 2 | RTC: 1")
|
|
print(" Channels: 4 (Analog: 3) (TMATS: 5)")
|
|
print(" Primary Type: Analog Format 2")
|
|
print()
|
|
|
|
# Show available enhanced fields
|
|
print("Available Enhanced Fields for Analysis:")
|
|
supported_fields = processor.ch10_decoder.supported_fields
|
|
|
|
timing_fields = [f for f in supported_fields if 'time' in f.name.lower() or 'drift' in f.name.lower()]
|
|
quality_fields = [f for f in supported_fields if 'quality' in f.name.lower() or 'error' in f.name.lower()]
|
|
data_fields = [f for f in supported_fields if 'analog' in f.name.lower() or 'channel' in f.name.lower()]
|
|
|
|
print(" Timing Fields:")
|
|
for field in timing_fields[:5]: # Show first 5
|
|
print(f" - {field.name}: {field.description}")
|
|
|
|
print(" Quality Fields:")
|
|
for field in quality_fields[:5]: # Show first 5
|
|
print(f" - {field.name}: {field.description}")
|
|
|
|
print(" Data Analysis Fields:")
|
|
for field in data_fields[:5]: # Show first 5
|
|
print(f" - {field.name}: {field.description}")
|
|
|
|
print(f" ... and {len(supported_fields) - 15} more fields available for custom analysis")
|
|
|
|
if __name__ == "__main__":
|
|
demonstrate_tui_integration() |