Major Features: - Complete modern TUI interface with three focused views - Enhanced multi-column layout: Source | Proto | Destination | Extended | Frame Type | Metrics - Simplified navigation with 1/2/3 hotkeys instead of F1/F2/F3 - Protocol hierarchy: Transport (TCP/UDP) → Extended (CH10/PTP) → Frame Types - Classic TUI preserved with --classic flag Views Implemented: 1. Flow Analysis View: Enhanced multi-column flow overview with protocol detection 2. Packet Decoder View: Three-panel deep inspection (Flows | Frames | Fields) 3. Statistical Analysis View: Four analysis modes with timing and quality metrics Technical Improvements: - Left-aligned text columns with IP:port precision - Transport protocol separation from extended protocols - Frame type identification (CH10-Data, TMATS, PTP Sync) - Cross-view communication with persistent flow selection - Context-sensitive help and status bars - Comprehensive error handling with console fallback
20 KiB
20 KiB
Modular Decoder Framework with Extensible Analysis
Core Concept
Create a framework where decoders expose structured frame data, and separate analysis plugins can process this data for domain-specific insights without modifying the core software.
Architecture Overview
Packet → Decoder → StructuredFrameData → Analysis Plugins → Enhanced FlowStats
1. Enhanced Decoder Interface
Base Decoder with Data Exposure
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Union
import struct
@dataclass
class FieldDefinition:
"""Defines a field that can be extracted from decoded data"""
name: str
description: str
data_type: type # int, float, str, bytes, etc.
unit: Optional[str] = None # "seconds", "nanoseconds", "MHz", etc.
validator: Optional[callable] = None # Function to validate field value
@dataclass
class StructuredFrameData:
"""Container for decoded frame data with metadata"""
decoder_name: str
packet_timestamp: float # Original packet timestamp
raw_data: bytes
fields: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def get_field(self, name: str, default=None):
"""Get a field value with optional default"""
return self.fields.get(name, default)
def has_field(self, name: str) -> bool:
"""Check if field exists"""
return name in self.fields
def get_timestamp_fields(self) -> List[str]:
"""Get list of fields that represent timestamps"""
return [name for name, value in self.metadata.items()
if name.endswith('_timestamp_field')]
class EnhancedDecoder(ABC):
"""Base class for decoders that expose structured data"""
@property
@abstractmethod
def decoder_name(self) -> str:
pass
@property
@abstractmethod
def supported_fields(self) -> List[FieldDefinition]:
"""Return list of fields this decoder can extract"""
pass
@abstractmethod
def can_decode(self, packet, transport_info: Dict) -> float:
"""Return confidence score 0.0-1.0"""
pass
@abstractmethod
def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]:
"""Decode packet and return structured data"""
pass
def validate_fields(self, frame_data: StructuredFrameData) -> List[str]:
"""Validate extracted fields, return list of validation errors"""
errors = []
for field_def in self.supported_fields:
if field_def.name in frame_data.fields:
value = frame_data.fields[field_def.name]
if field_def.validator and not field_def.validator(value):
errors.append(f"Field {field_def.name} failed validation")
return errors
2. Chapter 10 Enhanced Decoder Example
class Chapter10EnhancedDecoder(EnhancedDecoder):
"""Enhanced Chapter 10 decoder with structured data exposure"""
@property
def decoder_name(self) -> str:
return "Chapter10"
@property
def supported_fields(self) -> List[FieldDefinition]:
return [
FieldDefinition("sync_pattern", "Sync pattern (0xEB25)", int),
FieldDefinition("channel_id", "Channel identifier", int),
FieldDefinition("packet_length", "Total packet length", int, "bytes"),
FieldDefinition("data_length", "Data payload length", int, "bytes"),
FieldDefinition("header_version", "Header version", int),
FieldDefinition("sequence_number", "Packet sequence number", int),
FieldDefinition("packet_flags", "Packet flags", int),
FieldDefinition("data_type", "Data type identifier", int),
FieldDefinition("relative_time_counter", "RTC value", int, "counts"),
FieldDefinition("checksum", "Header checksum", int),
# Timing fields
FieldDefinition("internal_seconds", "CH10 internal seconds", int, "seconds"),
FieldDefinition("internal_nanoseconds", "CH10 internal nanoseconds", int, "nanoseconds"),
FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"),
# Data type specific fields
FieldDefinition("analog_channels", "Number of analog channels", int),
FieldDefinition("sample_rate", "Sampling rate", float, "Hz"),
FieldDefinition("pcm_format", "PCM format identifier", int),
]
def can_decode(self, packet, transport_info: Dict) -> float:
"""Check for Chapter 10 sync pattern"""
if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'):
return 0.0
from scapy.all import Raw
raw_data = bytes(packet[Raw])
# Look for sync pattern
if len(raw_data) >= 2:
sync = struct.unpack('<H', raw_data[:2])[0]
if sync == 0xEB25:
return 1.0
return 0.0
def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]:
"""Decode Chapter 10 frame with full field extraction"""
if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'):
return None
from scapy.all import Raw
raw_data = bytes(packet[Raw])
if len(raw_data) < 24: # Minimum header size
return None
try:
# Extract header fields
header = struct.unpack('<HHHHHHHHHHHH', raw_data[:24])
frame_data = StructuredFrameData(
decoder_name=self.decoder_name,
packet_timestamp=float(packet.time),
raw_data=raw_data
)
# Basic header fields
frame_data.fields.update({
'sync_pattern': header[0],
'channel_id': header[1],
'packet_length': header[2],
'data_length': header[3],
'header_version': header[4],
'sequence_number': header[5],
'packet_flags': header[6],
'data_type': header[7],
'relative_time_counter': header[8],
'checksum': header[9]
})
# Extract timing information if present
if len(raw_data) >= 32: # Extended header with timing
timing_data = struct.unpack('<II', raw_data[24:32])
frame_data.fields.update({
'internal_seconds': timing_data[0],
'internal_nanoseconds': timing_data[1],
'internal_timestamp': timing_data[0] + (timing_data[1] / 1_000_000_000)
})
# Mark timestamp fields for analysis plugins
frame_data.metadata.update({
'primary_timestamp_field': 'internal_timestamp',
'seconds_timestamp_field': 'internal_seconds',
'nanoseconds_timestamp_field': 'internal_nanoseconds'
})
# Data type specific parsing
if frame_data.fields['data_type'] in [0x72, 0x73, 0x74, 0x75]: # Analog formats
analog_info = self._parse_analog_header(raw_data[32:])
frame_data.fields.update(analog_info)
elif frame_data.fields['data_type'] in [0x04, 0x08]: # PCM formats
pcm_info = self._parse_pcm_header(raw_data[32:])
frame_data.fields.update(pcm_info)
return frame_data
except (struct.error, IndexError) as e:
return None
def _parse_analog_header(self, data: bytes) -> Dict[str, Any]:
"""Parse analog format specific header"""
if len(data) < 8:
return {}
try:
analog_header = struct.unpack('<HHI', data[:8])
return {
'analog_channels': analog_header[0],
'sample_rate': analog_header[2] / 1000.0 # Convert to Hz
}
except struct.error:
return {}
def _parse_pcm_header(self, data: bytes) -> Dict[str, Any]:
"""Parse PCM format specific header"""
if len(data) < 4:
return {}
try:
pcm_header = struct.unpack('<HH', data[:4])
return {
'pcm_format': pcm_header[0]
}
except struct.error:
return {}
3. Analysis Plugin Framework
class AnalysisPlugin(ABC):
"""Base class for analysis plugins that process decoded frame data"""
@property
@abstractmethod
def plugin_name(self) -> str:
pass
@property
@abstractmethod
def required_decoders(self) -> List[str]:
"""List of decoder names this plugin requires"""
pass
@property
@abstractmethod
def required_fields(self) -> Dict[str, List[str]]:
"""Dict of decoder_name -> list of required fields"""
pass
@abstractmethod
def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]:
"""Analyze a single frame, return analysis results"""
pass
@abstractmethod
def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]:
"""Analyze a complete flow, return aggregated results"""
pass
def can_analyze(self, frame_data: StructuredFrameData) -> bool:
"""Check if this plugin can analyze the given frame data"""
if frame_data.decoder_name not in self.required_decoders:
return False
required_fields = self.required_fields.get(frame_data.decoder_name, [])
return all(frame_data.has_field(field) for field in required_fields)
class Chapter10TimingAnalysisPlugin(AnalysisPlugin):
"""Analysis plugin for Chapter 10 timing comparisons"""
@property
def plugin_name(self) -> str:
return "CH10_Timing_Analysis"
@property
def required_decoders(self) -> List[str]:
return ["Chapter10"]
@property
def required_fields(self) -> Dict[str, List[str]]:
return {
"Chapter10": ["internal_timestamp", "internal_seconds", "internal_nanoseconds"]
}
def analyze_frame(self, frame_data: StructuredFrameData, flow_context: Dict) -> Dict[str, Any]:
"""Compare packet timestamp vs internal Chapter 10 timestamp"""
packet_time = frame_data.packet_timestamp
internal_time = frame_data.get_field('internal_timestamp')
if internal_time is None:
return {'error': 'No internal timestamp available'}
time_delta = packet_time - internal_time
# Calculate clock drift metrics
analysis = {
'packet_timestamp': packet_time,
'internal_timestamp': internal_time,
'time_delta_seconds': time_delta,
'time_delta_microseconds': time_delta * 1_000_000,
'clock_drift_ppm': self._calculate_drift_ppm(time_delta, flow_context),
'timestamp_source': 'CH10_internal'
}
# Flag significant timing discrepancies
if abs(time_delta) > 0.001: # 1ms threshold
analysis['timing_anomaly'] = True
analysis['anomaly_severity'] = 'high' if abs(time_delta) > 0.1 else 'medium'
return analysis
def analyze_flow(self, frame_data_list: List[StructuredFrameData]) -> Dict[str, Any]:
"""Analyze timing patterns across entire flow"""
if not frame_data_list:
return {}
timing_deltas = []
internal_intervals = []
packet_intervals = []
prev_frame = None
for frame_data in frame_data_list:
if not self.can_analyze(frame_data):
continue
# Calculate timing delta for this frame
packet_time = frame_data.packet_timestamp
internal_time = frame_data.get_field('internal_timestamp')
if internal_time:
timing_deltas.append(packet_time - internal_time)
# Calculate intervals if we have previous frame
if prev_frame:
packet_interval = packet_time - prev_frame.packet_timestamp
internal_interval = internal_time - prev_frame.get_field('internal_timestamp', 0)
packet_intervals.append(packet_interval)
internal_intervals.append(internal_interval)
prev_frame = frame_data
if not timing_deltas:
return {'error': 'No valid timing data found'}
# Statistical analysis
import statistics
flow_analysis = {
'frame_count': len(timing_deltas),
'avg_timing_delta': statistics.mean(timing_deltas),
'std_timing_delta': statistics.stdev(timing_deltas) if len(timing_deltas) > 1 else 0,
'min_timing_delta': min(timing_deltas),
'max_timing_delta': max(timing_deltas),
'timing_drift_trend': self._calculate_drift_trend(timing_deltas),
}
if packet_intervals and internal_intervals:
flow_analysis.update({
'avg_packet_interval': statistics.mean(packet_intervals),
'avg_internal_interval': statistics.mean(internal_intervals),
'interval_correlation': self._calculate_correlation(packet_intervals, internal_intervals)
})
return flow_analysis
def _calculate_drift_ppm(self, time_delta: float, flow_context: Dict) -> float:
"""Calculate clock drift in parts per million"""
# Simplified drift calculation - would be more sophisticated in practice
return (time_delta * 1_000_000) / flow_context.get('flow_duration', 1.0)
def _calculate_drift_trend(self, timing_deltas: List[float]) -> str:
"""Determine if timing is drifting in a particular direction"""
if len(timing_deltas) < 3:
return 'insufficient_data'
# Simple trend analysis
first_half = timing_deltas[:len(timing_deltas)//2]
second_half = timing_deltas[len(timing_deltas)//2:]
first_avg = sum(first_half) / len(first_half)
second_avg = sum(second_half) / len(second_half)
if second_avg > first_avg + 0.0001:
return 'increasing_drift'
elif second_avg < first_avg - 0.0001:
return 'decreasing_drift'
else:
return 'stable'
def _calculate_correlation(self, list1: List[float], list2: List[float]) -> float:
"""Calculate correlation coefficient between two timing sequences"""
# Simplified correlation - would use proper statistical methods
if len(list1) != len(list2) or len(list1) < 2:
return 0.0
# Basic correlation calculation
import statistics
mean1 = statistics.mean(list1)
mean2 = statistics.mean(list2)
numerator = sum((x - mean1) * (y - mean2) for x, y in zip(list1, list2))
denominator = (sum((x - mean1)**2 for x in list1) * sum((y - mean2)**2 for y in list2))**0.5
return numerator / denominator if denominator != 0 else 0.0
4. Plugin Registry and Integration
class AnalysisPluginRegistry:
"""Registry for managing analysis plugins"""
def __init__(self):
self.plugins: Dict[str, AnalysisPlugin] = {}
self.decoder_plugin_map: Dict[str, List[AnalysisPlugin]] = {}
def register_plugin(self, plugin: AnalysisPlugin):
"""Register an analysis plugin"""
self.plugins[plugin.plugin_name] = plugin
# Map decoders to plugins
for decoder_name in plugin.required_decoders:
if decoder_name not in self.decoder_plugin_map:
self.decoder_plugin_map[decoder_name] = []
self.decoder_plugin_map[decoder_name].append(plugin)
def get_plugins_for_decoder(self, decoder_name: str) -> List[AnalysisPlugin]:
"""Get all plugins that can analyze data from a specific decoder"""
return self.decoder_plugin_map.get(decoder_name, [])
def get_plugin(self, plugin_name: str) -> Optional[AnalysisPlugin]:
"""Get a specific plugin by name"""
return self.plugins.get(plugin_name)
class EnhancedFlowManager:
"""Enhanced flow manager with plugin support"""
def __init__(self):
self.decoders: List[EnhancedDecoder] = []
self.plugin_registry = AnalysisPluginRegistry()
self.flows: Dict[Tuple[str, str], EnhancedFlowStats] = {}
def register_decoder(self, decoder: EnhancedDecoder):
"""Register a decoder"""
self.decoders.append(decoder)
def register_plugin(self, plugin: AnalysisPlugin):
"""Register an analysis plugin"""
self.plugin_registry.register_plugin(plugin)
def process_packet(self, packet, frame_num: int):
"""Process packet with enhanced decoder and plugin analysis"""
# Extract transport info
transport_info = self._extract_transport_info(packet)
# Try decoders
for decoder in self.decoders:
confidence = decoder.can_decode(packet, transport_info)
if confidence > 0.5: # Threshold for acceptance
frame_data = decoder.decode_frame(packet, transport_info)
if frame_data:
# Apply analysis plugins
self._apply_plugins(frame_data, transport_info)
# Update flow with enhanced data
self._update_enhanced_flow(frame_data, transport_info)
break
def _apply_plugins(self, frame_data: StructuredFrameData, transport_info: Dict):
"""Apply relevant analysis plugins to frame data"""
plugins = self.plugin_registry.get_plugins_for_decoder(frame_data.decoder_name)
for plugin in plugins:
if plugin.can_analyze(frame_data):
analysis_result = plugin.analyze_frame(frame_data, {'transport_info': transport_info})
# Store plugin results in frame metadata
frame_data.metadata[f'plugin_{plugin.plugin_name}'] = analysis_result
5. Usage Example
# Setup enhanced system
flow_manager = EnhancedFlowManager()
# Register enhanced decoders
ch10_decoder = Chapter10EnhancedDecoder()
flow_manager.register_decoder(ch10_decoder)
# Register analysis plugins
timing_plugin = Chapter10TimingAnalysisPlugin()
flow_manager.register_plugin(timing_plugin)
# Process packets
for packet in packets:
flow_manager.process_packet(packet, frame_num)
# Access enhanced analysis results
for flow_key, flow_stats in flow_manager.flows.items():
timing_analysis = flow_stats.get_plugin_results('CH10_Timing_Analysis')
if timing_analysis:
print(f"Flow {flow_key}: Clock drift = {timing_analysis['avg_timing_delta']:.6f}s")
This modular framework allows you to:
- Extract any field from decoded frames without modifying core code
- Create custom analysis plugins for domain-specific insights
- Compare internal vs external timestamps for clock drift analysis
- Extend analysis without touching the main analyzer code
- Chain multiple plugins for complex analysis workflows
Questions for implementation:
- Which specific Chapter 10 timing analysis would be most valuable first?
- Should we create plugins for PTP timestamp analysis as well?
- What other modular analysis capabilities would be useful?