Files
StreamLens/test_decode_data.py
noisedestroyers 5c2cb1a4ed Modern TUI with Enhanced Protocol Hierarchy Interface
Major Features:
- Complete modern TUI interface with three focused views
- Enhanced multi-column layout: Source | Proto | Destination | Extended | Frame Type | Metrics
- Simplified navigation with 1/2/3 hotkeys instead of F1/F2/F3
- Protocol hierarchy: Transport (TCP/UDP) → Extended (CH10/PTP) → Frame Types
- Classic TUI preserved with --classic flag

Views Implemented:
1. Flow Analysis View: Enhanced multi-column flow overview with protocol detection
2. Packet Decoder View: Three-panel deep inspection (Flows | Frames | Fields)
3. Statistical Analysis View: Four analysis modes with timing and quality metrics

Technical Improvements:
- Left-aligned text columns with IP:port precision
- Transport protocol separation from extended protocols
- Frame type identification (CH10-Data, TMATS, PTP Sync)
- Cross-view communication with persistent flow selection
- Context-sensitive help and status bars
- Comprehensive error handling with console fallback
2025-07-26 22:46:49 -04:00

73 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
Test script to verify enhanced CH10 decode data is being captured
"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.utils import PCAPLoader
def test_decode_data():
"""Test that decoded frame data is being captured"""
# Create analyzer
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
# Load and process PCAP
print("Loading and processing PCAP...")
pcap_loader = PCAPLoader("1 PTPGM.pcapng")
packets = pcap_loader.load_all()
# Process packets through flow manager
for i, packet in enumerate(packets):
analyzer.flow_manager.process_packet(packet, i + 1)
# Calculate statistics
analyzer.statistics_engine.calculate_flow_statistics(analyzer.flow_manager.flows)
print("\n=== ENHANCED DECODE DATA TEST ===")
# Debug: Show all flows and their decoder types
print(f"Total flows found: {len(analyzer.flow_manager.flows)}")
for flow_key, flow in analyzer.flow_manager.flows.items():
print(f"Flow {flow_key}: {flow.enhanced_analysis.decoder_type}, protocols: {flow.detected_protocol_types}")
# Find CH10 flows
ch10_flows = []
for flow in analyzer.flow_manager.flows.values():
if flow.enhanced_analysis.decoder_type == "Chapter10_Enhanced":
ch10_flows.append(flow)
print(f"Found {len(ch10_flows)} flows with enhanced CH10 decoding")
for i, flow in enumerate(ch10_flows):
print(f"\nFlow {i+1}: {flow.src_ip} -> {flow.dst_ip}")
enhanced = flow.enhanced_analysis
print(f" Decoder Type: {enhanced.decoder_type}")
print(f" Primary Data Type: {enhanced.primary_data_type}")
print(f" Available Fields: {len(enhanced.available_field_names)}")
print(f" Sample Frames Stored: {len(enhanced.sample_decoded_fields)}")
if enhanced.sample_decoded_fields:
print(" Sample Decoded Data:")
for frame_key, frame_data in enhanced.sample_decoded_fields.items():
print(f" {frame_key}: {len(frame_data)} fields")
# Show first few fields
for j, (field_name, value) in enumerate(list(frame_data.items())[:5]):
print(f" {field_name}: {value}")
if len(frame_data) > 5:
print(f" ... and {len(frame_data) - 5} more fields")
else:
print(" No decoded frame data stored!")
if enhanced.available_field_names:
print(f" Field Categories: Timing: {sum(1 for f in enhanced.available_field_names if 'time' in f.lower())}, "
f"Quality: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['quality', 'error']))}, "
f"Data: {sum(1 for f in enhanced.available_field_names if any(k in f.lower() for k in ['data', 'analog', 'channel']))}")
if __name__ == "__main__":
test_decode_data()