diff --git a/Glossary.md b/Glossary.md index 5353260..82b0703 100644 --- a/Glossary.md +++ b/Glossary.md @@ -83,12 +83,12 @@ Quantitative measures describing flow characteristics: - **Standard Deviation**: Measure of timing variability ### **Frame Type** -Classification of packets within a flow based on content or protocol structure (e.g., "CH10-Data", "TMATS", "PTP-Sync"). +Classification of packets within a flow based on content or protocol structure (e.g., "Ch10-Data", "Ch10-TMATS", "PTP-Sync"). ### **Traffic Classification** Categorization of network traffic by destination address: - **Unicast**: Point-to-point communication -- **Multicast**: One-to-many distribution +- **Multicast**: One-to-many distribution. [IPv4](https://en.wikipedia.org/wiki/IPv4 "IPv4") multicast addresses are defined by the [most-significant bit](https://en.wikipedia.org/wiki/Most-significant_bit "Most-significant bit") pattern of _1110_. - **Broadcast**: One-to-all transmission ### **Enhanced Analysis** diff --git a/analyzer/analysis/flow_manager.py b/analyzer/analysis/flow_manager.py index 1de21aa..08cba9e 100644 --- a/analyzer/analysis/flow_manager.py +++ b/analyzer/analysis/flow_manager.py @@ -240,6 +240,39 @@ class FlowManager: if self._is_tmats_frame(packet, ch10_info): return 'TMATS' else: + # Use enhanced decoder information if available + if 'decoded_payload' in ch10_info: + decoded = ch10_info['decoded_payload'] + data_type_name = decoded.get('data_type_name', 'CH10-Data') + + # Simplify timing frame names for display + if 'ACTTS' in data_type_name: + return 'CH10-ACTTS' + elif 'GPS NMEA' in data_type_name: + return 'CH10-GPS' + elif 'EAG ACMI' in data_type_name: + return 'CH10-ACMI' + elif 'Custom' in data_type_name and 'Timing' in data_type_name: + # Extract variant for custom timing + if 'Variant 0x04' in data_type_name: + return 'CH10-ACTTS' + elif 'Extended Timing' in data_type_name: + return 'CH10-ExtTiming' + else: + return 'CH10-Timing' + elif 'Ethernet' in data_type_name: + return 'CH10-Ethernet' + elif 'Image' in data_type_name: + return 'CH10-Image' + elif 'UART' in data_type_name: + return 'CH10-UART' + elif 'CAN' in data_type_name: + return 'CH10-CAN' + elif 'Unknown' not in data_type_name: + # Extract first word for other known types + first_word = data_type_name.split()[0] + return f'CH10-{first_word}' + return 'CH10-Data' # Check for other specialized protocols diff --git a/analyzer/protocols/chapter10.py b/analyzer/protocols/chapter10.py index 6158a29..b41b6d5 100644 --- a/analyzer/protocols/chapter10.py +++ b/analyzer/protocols/chapter10.py @@ -22,6 +22,7 @@ except ImportError: sys.exit(1) from .base import ProtocolDissector, DissectionResult, ProtocolType +from .decoders.registry import decoder_registry class Chapter10Dissector(ProtocolDissector): @@ -117,7 +118,18 @@ class Chapter10Dissector(ProtocolDissector): if packet_length > 24 and payload_start + (packet_length - 24) <= len(raw_data): result.payload = raw_data[payload_start:payload_start + (packet_length - 24)] - # Try to parse specific data formats + # Use new decoder framework for payload parsing + decoded_payload = decoder_registry.decode_payload(result.payload, header) + if decoded_payload: + result.fields['decoded_payload'] = { + 'data_type_name': decoded_payload.data_type_name, + 'format_version': decoded_payload.format_version, + 'decoded_data': decoded_payload.decoded_data, + 'decoder_errors': decoded_payload.errors, + 'decoder_metadata': decoded_payload.metadata + } + + # Legacy Ethernet Format 0 parsing (for backwards compatibility) data_type = header.get('data_type', 0) if data_type == 0x40: # Ethernet Format 0 eth_data = self._parse_ethernet_fmt0(result.payload) diff --git a/analyzer/protocols/decoders/__init__.py b/analyzer/protocols/decoders/__init__.py new file mode 100644 index 0000000..1f6b659 --- /dev/null +++ b/analyzer/protocols/decoders/__init__.py @@ -0,0 +1,34 @@ +""" +Chapter 10 Data Type Decoders +Modular decoder framework for IRIG-106 Chapter 10 data types +""" + +from .base import DataTypeDecoder, DecodedPayload +from .registry import DecoderRegistry +from .tspi_cts import TSPICTSDecoder, ACTTSDecoder, GPSNMEADecoder, EAGACMIDecoder +from .image import ImageDecoder +from .uart import UARTDecoder +from .ieee1394 import IEEE1394Decoder +from .parallel import ParallelDecoder +from .ethernet import EthernetDecoder +from .can_bus import CANBusDecoder +from .fibre_channel import FibreChannelDecoder +from .custom_timing import CustomTimingDecoder + +__all__ = [ + 'DataTypeDecoder', + 'DecodedPayload', + 'DecoderRegistry', + 'TSPICTSDecoder', + 'ACTTSDecoder', + 'GPSNMEADecoder', + 'EAGACMIDecoder', + 'ImageDecoder', + 'UARTDecoder', + 'IEEE1394Decoder', + 'ParallelDecoder', + 'EthernetDecoder', + 'CANBusDecoder', + 'FibreChannelDecoder', + 'CustomTimingDecoder' +] \ No newline at end of file diff --git a/analyzer/protocols/decoders/base.py b/analyzer/protocols/decoders/base.py new file mode 100644 index 0000000..db51932 --- /dev/null +++ b/analyzer/protocols/decoders/base.py @@ -0,0 +1,162 @@ +""" +Base classes for Chapter 10 data type decoders +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional, List, Union +from dataclasses import dataclass +import struct + + +@dataclass +class DecodedPayload: + """Container for decoded Chapter 10 payload data""" + data_type: int + data_type_name: str + format_version: int + decoded_data: Dict[str, Any] + raw_payload: bytes + errors: List[str] + metadata: Dict[str, Any] + + def __post_init__(self): + if self.errors is None: + self.errors = [] + if self.metadata is None: + self.metadata = {} + + +class DataTypeDecoder(ABC): + """Abstract base class for Chapter 10 data type decoders""" + + def __init__(self): + self.supported_formats: List[int] = [] + self.data_type_base: int = 0x00 + self.data_type_name: str = "Unknown" + + @abstractmethod + def can_decode(self, data_type: int) -> bool: + """Check if this decoder can handle the given data type""" + pass + + @abstractmethod + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode the payload data""" + pass + + def get_data_type_name(self, data_type: int) -> str: + """Get human-readable name for data type""" + return f"{self.data_type_name} Format {data_type & 0x0F}" + + def _parse_intra_packet_header(self, payload: bytes, offset: int = 0) -> Optional[Dict[str, Any]]: + """Parse common intra-packet header (IPH)""" + if len(payload) < offset + 8: + return None + + try: + # Standard IPH format + iph_time = struct.unpack(' Optional[tuple]: + """Safely unpack binary data with error handling""" + try: + size = struct.calcsize(format_str) + if len(data) < offset + size: + return None + return struct.unpack(format_str, data[offset:offset+size]) + except struct.error: + return None + + +class ContainerDecoder(DataTypeDecoder): + """Decoder for containerized data formats""" + + def decode_container(self, payload: bytes, ch10_header: Dict[str, Any]) -> List[DecodedPayload]: + """Decode multiple embedded packets from container""" + decoded_packets = [] + offset = 0 + + while offset < len(payload): + # Look for embedded CH-10 sync pattern + sync_offset = self._find_sync_pattern(payload, offset) + if sync_offset is None: + break + + # Parse embedded header + embedded_header = self._parse_embedded_header(payload, sync_offset) + if not embedded_header: + break + + # Extract embedded payload + embedded_payload = self._extract_embedded_payload(payload, sync_offset, embedded_header) + if embedded_payload: + # Recursively decode embedded packet + decoded = self.decode(embedded_payload, embedded_header) + if decoded: + decoded_packets.append(decoded) + + # Move to next packet + offset = sync_offset + embedded_header.get('packet_length', 24) + + return decoded_packets + + def _find_sync_pattern(self, data: bytes, start_offset: int = 0) -> Optional[int]: + """Find CH-10 sync pattern in data""" + sync_pattern = 0xEB25 + for offset in range(start_offset, len(data) - 1): + if offset + 2 <= len(data): + word = struct.unpack(' Optional[Dict[str, Any]]: + """Parse embedded CH-10 header""" + if len(data) < offset + 24: + return None + + try: + sync_pattern = struct.unpack(' Optional[bytes]: + """Extract payload from embedded packet""" + payload_start = header_offset + 24 + payload_length = header.get('data_length', 0) + + if payload_start + payload_length > len(data): + return None + + return data[payload_start:payload_start + payload_length] \ No newline at end of file diff --git a/analyzer/protocols/decoders/can_bus.py b/analyzer/protocols/decoders/can_bus.py new file mode 100644 index 0000000..b38a219 --- /dev/null +++ b/analyzer/protocols/decoders/can_bus.py @@ -0,0 +1,105 @@ +""" +CAN Bus decoder for Chapter 10 data types +Supports Controller Area Network Bus (0x78) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class CANBusDecoder(DataTypeDecoder): + """Decoder for CAN Bus type (0x78)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x78 + self.data_type_name = "CAN Bus" + self.supported_formats = [0x78] + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x78 + + def get_data_type_name(self, data_type: int) -> str: + return "Controller Area Network Bus" + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode CAN Bus payload""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse CAN messages + messages = [] + offset = data_start + + while offset + 16 <= len(payload): + can_header = self._safe_unpack('= 100: # Limit output size + break + + decoded_data['can_messages'] = messages + decoded_data['message_count'] = len(messages) + + # Statistics + if messages: + extended_count = sum(1 for msg in messages if msg['extended_id']) + remote_count = sum(1 for msg in messages if msg['remote_frame']) + error_count = sum(1 for msg in messages if msg['error_frame']) + + decoded_data['statistics'] = { + 'extended_frames': extended_count, + 'remote_frames': remote_count, + 'error_frames': error_count, + 'standard_frames': len(messages) - extended_count + } + + return DecodedPayload( + data_type=0x78, + data_type_name="Controller Area Network Bus", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'CANBusDecoder'} + ) \ No newline at end of file diff --git a/analyzer/protocols/decoders/custom_timing.py b/analyzer/protocols/decoders/custom_timing.py new file mode 100644 index 0000000..7d55173 --- /dev/null +++ b/analyzer/protocols/decoders/custom_timing.py @@ -0,0 +1,279 @@ +""" +Custom timing decoder for proprietary Chapter 10 timing frames +Handles the 0x72xx-0x78xx timing sequence found in ACTTS-like systems +""" + +import struct +from typing import Dict, Any, Optional, List +from .base import DataTypeDecoder, DecodedPayload + + +class CustomTimingDecoder(DataTypeDecoder): + """Decoder for custom timing frames (0x7200-0x7899)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x72 + self.data_type_name = "Custom Timing" + self.supported_formats = [] + # Support all 0x72xx - 0x78xx variants + for base in range(0x72, 0x79): + for variant in range(0x00, 0x100): + self.supported_formats.append((base << 8) | variant) + + def can_decode(self, data_type: int) -> bool: + # Check if data type is in the 0x72xx-0x78xx range + return 0x7200 <= data_type <= 0x78FF + + def get_data_type_name(self, data_type: int) -> str: + base = (data_type >> 8) & 0xFF + variant = data_type & 0xFF + + timing_types = { + 0x72: "Custom ACTTS Timing", + 0x73: "Extended Timing Format", + 0x74: "Sync Timing Format", + 0x75: "Clock Reference Format", + 0x76: "Time Correlation Format", + 0x77: "Timing Validation Format", + 0x78: "Multi-Source Timing" + } + + base_name = timing_types.get(base, f"Timing Format 0x{base:02x}") + return f"{base_name} (Variant 0x{variant:02x})" + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode custom timing payload""" + data_type = ch10_header.get('data_type', 0) + + if not self.can_decode(data_type): + return None + + decoded_data = {} + errors = [] + + # Parse IPH if present + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + # For custom timing, missing IPH might be normal + + # Analyze timing data structure + timing_analysis = self._analyze_timing_structure(payload, data_start, data_type) + decoded_data.update(timing_analysis) + + # Extract CH-10 timing information from header + ch10_time = ch10_header.get('relative_time_counter', 0) + decoded_data['ch10_time_counter'] = ch10_time + decoded_data['ch10_sequence'] = ch10_header.get('sequence_number', 0) + decoded_data['ch10_channel'] = ch10_header.get('channel_id', 0) + + # Calculate timing metrics + if 'timing_samples' in decoded_data and decoded_data['timing_samples']: + timing_metrics = self._calculate_timing_metrics(decoded_data['timing_samples']) + decoded_data['timing_metrics'] = timing_metrics + + return DecodedPayload( + data_type=data_type, + data_type_name=self.get_data_type_name(data_type), + format_version=(data_type >> 8) & 0x0F, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'CustomTimingDecoder', 'timing_type': 'proprietary'} + ) + + def _analyze_timing_structure(self, payload: bytes, data_start: int, data_type: int) -> Dict[str, Any]: + """Analyze the structure of timing data""" + analysis = {} + + if data_start >= len(payload): + return {'error': 'No timing data available'} + + timing_data = payload[data_start:] + analysis['timing_data_length'] = len(timing_data) + + # Look for timing patterns + timing_samples = [] + timestamps = [] + + # Try different word sizes for timing data + for word_size in [4, 8]: + if len(timing_data) >= word_size: + samples = self._extract_timing_words(timing_data, word_size) + if samples: + timing_samples.extend(samples[:50]) # Limit to first 50 samples + + analysis['timing_samples'] = timing_samples + analysis['sample_count'] = len(timing_samples) + + # Look for embedded timing markers + timing_markers = self._find_timing_markers(timing_data) + if timing_markers: + analysis['timing_markers'] = timing_markers + + # Detect timing format based on data type + base_type = (data_type >> 8) & 0xFF + if base_type == 0x72: + # ACTTS-style timing + actts_analysis = self._analyze_actts_timing(timing_data) + analysis.update(actts_analysis) + elif base_type in [0x73, 0x74, 0x75, 0x76, 0x77]: + # Extended timing formats + extended_analysis = self._analyze_extended_timing(timing_data, base_type) + analysis.update(extended_analysis) + elif base_type == 0x78: + # Multi-source timing + multi_analysis = self._analyze_multi_source_timing(timing_data) + analysis.update(multi_analysis) + + return analysis + + def _extract_timing_words(self, data: bytes, word_size: int) -> List[int]: + """Extract timing words from binary data""" + words = [] + format_str = '= 100: # Limit extraction + break + + return words + + def _find_timing_markers(self, data: bytes) -> List[Dict[str, Any]]: + """Find timing synchronization markers in data""" + markers = [] + + # Common timing sync patterns + sync_patterns = [ + b'\x81\x81\x81\x81', # Potential sync pattern + b'\x82\x82\x82\x82', # Another potential pattern + b'\xa9\xa9\xa9\xa9', # Observed in your data + ] + + for pattern in sync_patterns: + offset = 0 + while True: + pos = data.find(pattern, offset) + if pos == -1: + break + + markers.append({ + 'pattern': pattern.hex(), + 'offset': pos, + 'description': f'Sync pattern at offset {pos}' + }) + + offset = pos + 1 + if len(markers) >= 20: # Limit markers + break + + return markers + + def _analyze_actts_timing(self, data: bytes) -> Dict[str, Any]: + """Analyze ACTTS-style timing data""" + analysis = {'timing_format': 'ACTTS-style'} + + if len(data) >= 16: + # Look for ACTTS-like header + try: + header = struct.unpack(' Dict[str, Any]: + """Analyze extended timing formats (0x73-0x77)""" + analysis = {'timing_format': f'Extended Format 0x{base_type:02x}'} + + # Look for timing sequences + if len(data) >= 8: + try: + seq_data = struct.unpack(' Dict[str, Any]: + """Analyze multi-source timing data (0x78)""" + analysis = {'timing_format': 'Multi-source timing'} + + # Look for multiple timing sources + sources = [] + offset = 0 + + while offset + 8 <= len(data): + try: + source_data = struct.unpack('= 10: # Limit sources + break + + analysis['timing_sources'] = sources + analysis['source_count'] = len(sources) + + return analysis + + def _calculate_timing_metrics(self, samples: List[int]) -> Dict[str, Any]: + """Calculate timing statistics from samples""" + if not samples or len(samples) < 2: + return {} + + # Calculate deltas + deltas = [samples[i+1] - samples[i] for i in range(len(samples)-1)] + + # Basic statistics + metrics = { + 'sample_count': len(samples), + 'min_value': min(samples), + 'max_value': max(samples), + 'range': max(samples) - min(samples), + 'first_sample': samples[0], + 'last_sample': samples[-1] + } + + if deltas: + metrics.update({ + 'min_delta': min(deltas), + 'max_delta': max(deltas), + 'avg_delta': sum(deltas) / len(deltas), + 'zero_deltas': deltas.count(0), + 'constant_rate': len(set(deltas)) == 1 + }) + + return metrics \ No newline at end of file diff --git a/analyzer/protocols/decoders/ethernet.py b/analyzer/protocols/decoders/ethernet.py new file mode 100644 index 0000000..4358fbc --- /dev/null +++ b/analyzer/protocols/decoders/ethernet.py @@ -0,0 +1,316 @@ +""" +Ethernet Data decoders for Chapter 10 data types +Supports Ethernet Data Formats 0-1 (0x68-0x69) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class EthernetDecoder(DataTypeDecoder): + """Decoder for Ethernet Data types (0x68-0x69)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x68 + self.data_type_name = "Ethernet Data" + self.supported_formats = [0x68, 0x69] + + def can_decode(self, data_type: int) -> bool: + return data_type in [0x68, 0x69] + + def get_data_type_name(self, data_type: int) -> str: + format_names = { + 0x68: "Ethernet Data Format 0", + 0x69: "Ethernet UDP Payload" + } + return format_names.get(data_type, f"Ethernet Format {data_type & 0x0F}") + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode Ethernet payload""" + data_type = ch10_header.get('data_type', 0) + + if not self.can_decode(data_type): + return None + + if data_type == 0x68: + return self._decode_ethernet_format0(payload, ch10_header) + elif data_type == 0x69: + return self._decode_ethernet_udp_payload(payload, ch10_header) + + return None + + def _decode_ethernet_format0(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode Ethernet Format 0 (Full Ethernet Frame)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse Ethernet Format 0 header + if data_start + 12 <= len(payload): + eth_header = self._safe_unpack('> 28) & 0x3 + }) + + # Decode content type + content_types = { + 0: "Full MAC frame", + 1: "Payload only", + 2: "Reserved", + 3: "Reserved" + } + decoded_data['content_type_description'] = content_types.get( + decoded_data['content_type'], "Unknown" + ) + + # Extract Ethernet frame data + frame_data_start = data_start + 12 + frame_length = decoded_data['frame_length'] + + if frame_data_start + frame_length <= len(payload): + frame_data = payload[frame_data_start:frame_data_start + frame_length] + + # Parse Ethernet header if full MAC frame + if decoded_data['content_type'] == 0 and len(frame_data) >= 14: + eth_parsed = self._parse_ethernet_header(frame_data) + decoded_data.update(eth_parsed) + else: + decoded_data['raw_frame_data'] = frame_data[:64].hex() # First 64 bytes + + decoded_data['actual_frame_length'] = len(frame_data) + else: + errors.append("Frame data extends beyond payload") + else: + errors.append("Failed to parse Ethernet Format 0 header") + else: + errors.append("Insufficient data for Ethernet header") + + return DecodedPayload( + data_type=0x68, + data_type_name="Ethernet Data Format 0", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'EthernetDecoder'} + ) + + def _decode_ethernet_udp_payload(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode Ethernet UDP Payload (Format 1)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse UDP payload header + if data_start + 16 <= len(payload): + udp_header = self._safe_unpack(' Dict[str, Any]: + """Parse Ethernet MAC header""" + if len(frame_data) < 14: + return {'eth_parse_error': 'Insufficient data for Ethernet header'} + + # Parse MAC addresses and EtherType + dst_mac = frame_data[0:6] + src_mac = frame_data[6:12] + ethertype = struct.unpack('>H', frame_data[12:14])[0] + + eth_data = { + 'dst_mac': ':'.join(f'{b:02x}' for b in dst_mac), + 'src_mac': ':'.join(f'{b:02x}' for b in src_mac), + 'ethertype': f'0x{ethertype:04x}', + 'ethertype_description': self._decode_ethertype(ethertype) + } + + # Parse payload based on EtherType + if ethertype == 0x0800 and len(frame_data) >= 34: # IPv4 + ip_data = self._parse_ip_header(frame_data[14:]) + eth_data.update(ip_data) + elif ethertype == 0x0806 and len(frame_data) >= 42: # ARP + arp_data = self._parse_arp_header(frame_data[14:]) + eth_data.update(arp_data) + + return eth_data + + def _parse_ip_header(self, ip_data: bytes) -> Dict[str, Any]: + """Parse IPv4 header""" + if len(ip_data) < 20: + return {'ip_parse_error': 'Insufficient data for IP header'} + + version_ihl = ip_data[0] + version = (version_ihl >> 4) & 0x0F + ihl = version_ihl & 0x0F + + if version != 4: + return {'ip_parse_error': f'Unsupported IP version: {version}'} + + tos, total_length, identification, flags_fragment = struct.unpack('>BHHH', ip_data[1:9]) + ttl, protocol, checksum = struct.unpack('>BBH', ip_data[8:12]) + src_ip = struct.unpack('>I', ip_data[12:16])[0] + dst_ip = struct.unpack('>I', ip_data[16:20])[0] + + return { + 'ip_version': version, + 'ip_header_length': ihl * 4, + 'ip_tos': tos, + 'ip_total_length': total_length, + 'ip_id': identification, + 'ip_ttl': ttl, + 'ip_protocol': protocol, + 'ip_src': self._ip_to_string(src_ip), + 'ip_dst': self._ip_to_string(dst_ip), + 'ip_protocol_name': self._decode_ip_protocol(protocol) + } + + def _parse_arp_header(self, arp_data: bytes) -> Dict[str, Any]: + """Parse ARP header""" + if len(arp_data) < 28: + return {'arp_parse_error': 'Insufficient data for ARP header'} + + hw_type, proto_type, hw_len, proto_len, opcode = struct.unpack('>HHBBH', arp_data[0:8]) + sender_hw = arp_data[8:14] + sender_proto = struct.unpack('>I', arp_data[14:18])[0] + target_hw = arp_data[18:24] + target_proto = struct.unpack('>I', arp_data[24:28])[0] + + return { + 'arp_hw_type': hw_type, + 'arp_proto_type': f'0x{proto_type:04x}', + 'arp_opcode': opcode, + 'arp_opcode_description': 'Request' if opcode == 1 else 'Reply' if opcode == 2 else f'Unknown ({opcode})', + 'arp_sender_hw': ':'.join(f'{b:02x}' for b in sender_hw), + 'arp_sender_ip': self._ip_to_string(sender_proto), + 'arp_target_hw': ':'.join(f'{b:02x}' for b in target_hw), + 'arp_target_ip': self._ip_to_string(target_proto) + } + + def _analyze_udp_payload(self, payload: bytes) -> Dict[str, Any]: + """Analyze UDP payload content""" + analysis = {} + + if len(payload) == 0: + return {'payload_analysis': 'Empty payload'} + + # Check for common protocols + if len(payload) >= 4: + # Check for DNS (port 53 patterns) + if payload[0:2] in [b'\x00\x01', b'\x81\x80', b'\x01\x00']: + analysis['possible_protocol'] = 'DNS' + # Check for DHCP magic cookie + elif payload[:4] == b'\x63\x82\x53\x63': + analysis['possible_protocol'] = 'DHCP' + # Check for RTP (version 2) + elif (payload[0] & 0xC0) == 0x80: + analysis['possible_protocol'] = 'RTP' + else: + analysis['possible_protocol'] = 'Unknown' + + # Basic statistics + analysis['payload_entropy'] = self._calculate_entropy(payload[:256]) # First 256 bytes + analysis['null_bytes'] = payload.count(0) + analysis['printable_chars'] = sum(1 for b in payload[:256] if 32 <= b <= 126) + + return analysis + + def _calculate_entropy(self, data: bytes) -> float: + """Calculate Shannon entropy of data""" + if not data: + return 0.0 + + counts = [0] * 256 + for byte in data: + counts[byte] += 1 + + entropy = 0.0 + length = len(data) + for count in counts: + if count > 0: + p = count / length + entropy -= p * (p.bit_length() - 1) # log2(p) + + return entropy + + def _ip_to_string(self, ip_int: int) -> str: + """Convert 32-bit integer to IP address string""" + return f"{(ip_int >> 24) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 8) & 0xFF}.{ip_int & 0xFF}" + + def _decode_ethertype(self, ethertype: int) -> str: + """Decode EtherType field""" + types = { + 0x0800: "IPv4", + 0x0806: "ARP", + 0x86DD: "IPv6", + 0x8100: "VLAN", + 0x88F7: "PTP" + } + return types.get(ethertype, f"Unknown (0x{ethertype:04x})") + + def _decode_ip_protocol(self, protocol: int) -> str: + """Decode IP protocol field""" + protocols = { + 1: "ICMP", + 6: "TCP", + 17: "UDP", + 89: "OSPF", + 132: "SCTP" + } + return protocols.get(protocol, f"Unknown ({protocol})") \ No newline at end of file diff --git a/analyzer/protocols/decoders/fibre_channel.py b/analyzer/protocols/decoders/fibre_channel.py new file mode 100644 index 0000000..56706ae --- /dev/null +++ b/analyzer/protocols/decoders/fibre_channel.py @@ -0,0 +1,166 @@ +""" +Fibre Channel Data decoder for Chapter 10 data types +Supports Fibre Channel Format 0 (0x79) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class FibreChannelDecoder(DataTypeDecoder): + """Decoder for Fibre Channel Data type (0x79)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x79 + self.data_type_name = "Fibre Channel Data" + self.supported_formats = [0x79] + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x79 + + def get_data_type_name(self, data_type: int) -> str: + return "Fibre Channel Data Format 0" + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode Fibre Channel payload""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse FC frame header + if data_start + 24 <= len(payload): + # FC frame header (simplified) + fc_header = self._safe_unpack('> 24) & 0xFF, + 'fc_d_id': fc_header[2] & 0xFFFFFF, + 'fc_cs_ctl': (fc_header[3] >> 24) & 0xFF, + 'fc_s_id': fc_header[3] & 0xFFFFFF, + 'fc_type': (fc_header[4] >> 24) & 0xFF, + 'fc_f_ctl': fc_header[4] & 0xFFFFFF, + 'fc_seq_id': (fc_header[5] >> 24) & 0xFF, + 'fc_df_ctl': (fc_header[5] >> 16) & 0xFF, + 'fc_seq_cnt': fc_header[5] & 0xFFFF + }) + + # Decode R_CTL field + r_ctl = decoded_data['fc_r_ctl'] + decoded_data['fc_r_ctl_description'] = self._decode_r_ctl(r_ctl) + + # Decode Type field + fc_type = decoded_data['fc_type'] + decoded_data['fc_type_description'] = self._decode_fc_type(fc_type) + + # Extract payload + fc_payload_start = data_start + 24 + frame_length = decoded_data['fc_frame_length'] + if fc_payload_start + frame_length <= len(payload): + fc_payload = payload[fc_payload_start:fc_payload_start + frame_length] + decoded_data['fc_payload_length'] = len(fc_payload) + decoded_data['fc_payload_preview'] = fc_payload[:64].hex() + + # Analyze payload based on type + if fc_type == 0x08: # SCSI FCP + scsi_data = self._parse_scsi_fcp(fc_payload) + decoded_data.update(scsi_data) + else: + errors.append("FC payload extends beyond packet") + else: + errors.append("Failed to parse FC header") + + return DecodedPayload( + data_type=0x79, + data_type_name="Fibre Channel Data Format 0", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'FibreChannelDecoder'} + ) + + def _decode_r_ctl(self, r_ctl: int) -> str: + """Decode R_CTL field""" + r_ctl_types = { + 0x00: "Device Data", + 0x01: "Extended Link Data", + 0x02: "FC-4 Link Data", + 0x03: "Video Data", + 0x20: "Basic Link Data", + 0x21: "ACK_1", + 0x22: "ACK_0", + 0x23: "P_RJT", + 0x24: "F_RJT", + 0x25: "P_BSY", + 0x26: "F_BSY" + } + return r_ctl_types.get(r_ctl, f"Unknown (0x{r_ctl:02x})") + + def _decode_fc_type(self, fc_type: int) -> str: + """Decode FC Type field""" + fc_types = { + 0x00: "Basic Link Service", + 0x01: "Extended Link Service", + 0x04: "IP over FC", + 0x05: "ATM over FC", + 0x08: "SCSI FCP", + 0x09: "SCSI GPP", + 0x0A: "IPI-3 Master", + 0x0B: "IPI-3 Slave", + 0x0C: "IPI-3 Peer" + } + return fc_types.get(fc_type, f"Unknown (0x{fc_type:02x})") + + def _parse_scsi_fcp(self, payload: bytes) -> Dict[str, Any]: + """Parse SCSI FCP payload""" + scsi_data = {} + + if len(payload) >= 32: + # FCP_CMND structure + lun = payload[0:8] + task_codes = payload[8] + task_mgmt = payload[9] + add_cdb_len = payload[10] + rddata = bool(payload[11] & 0x02) + wrdata = bool(payload[11] & 0x01) + + scsi_data.update({ + 'scsi_lun': lun.hex(), + 'scsi_task_codes': task_codes, + 'scsi_task_mgmt': task_mgmt, + 'scsi_rddata': rddata, + 'scsi_wrdata': wrdata + }) + + # CDB starts at offset 12 + if len(payload) >= 16: + cdb = payload[12:16] + scsi_data['scsi_cdb'] = cdb.hex() + if cdb[0] in [0x12, 0x00, 0x28, 0x2A]: # Common SCSI commands + scsi_data['scsi_command'] = self._decode_scsi_command(cdb[0]) + + return scsi_data + + def _decode_scsi_command(self, opcode: int) -> str: + """Decode SCSI command opcode""" + commands = { + 0x00: "TEST UNIT READY", + 0x12: "INQUIRY", + 0x28: "READ(10)", + 0x2A: "WRITE(10)", + 0x35: "SYNCHRONIZE CACHE", + 0x3C: "READ BUFFER" + } + return commands.get(opcode, f"Unknown (0x{opcode:02x})") diff --git a/analyzer/protocols/decoders/ieee1394.py b/analyzer/protocols/decoders/ieee1394.py new file mode 100644 index 0000000..3fe98e4 --- /dev/null +++ b/analyzer/protocols/decoders/ieee1394.py @@ -0,0 +1,142 @@ +""" +IEEE 1394 Data decoders for Chapter 10 data types +Supports IEEE 1394 Formats 0-1 (0x58-0x59) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class IEEE1394Decoder(DataTypeDecoder): + """Decoder for IEEE 1394 Data types (0x58-0x59)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x58 + self.data_type_name = "IEEE 1394 Data" + self.supported_formats = [0x58, 0x59] + + def can_decode(self, data_type: int) -> bool: + return data_type in [0x58, 0x59] + + def get_data_type_name(self, data_type: int) -> str: + format_names = { + 0x58: "IEEE 1394 Transaction", + 0x59: "IEEE 1394 Physical Layer" + } + return format_names.get(data_type, f"IEEE 1394 Format {data_type & 0x0F}") + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode IEEE 1394 payload""" + data_type = ch10_header.get('data_type', 0) + + if not self.can_decode(data_type): + return None + + if data_type == 0x58: + return self._decode_transaction(payload, ch10_header) + elif data_type == 0x59: + return self._decode_physical_layer(payload, ch10_header) + + return None + + def _decode_transaction(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode IEEE 1394 Transaction data""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse 1394 transaction header + if data_start + 16 <= len(payload): + tx_header = self._safe_unpack('> 16) & 0xFFFF, + 'destination_offset': tx_header[2], + 'data_length': tx_header[3] + }) + + # Decode transaction code + tx_codes = { + 0: "Write Request", + 1: "Write Response", + 4: "Read Request", + 5: "Read Response", + 6: "Lock Request", + 7: "Lock Response" + } + decoded_data['transaction_type'] = tx_codes.get( + decoded_data['transaction_code'], + f"Unknown ({decoded_data['transaction_code']})" + ) + + # Extract transaction data + tx_data_start = data_start + 16 + tx_data_length = decoded_data['data_length'] + if tx_data_start + tx_data_length <= len(payload): + tx_data = payload[tx_data_start:tx_data_start + tx_data_length] + decoded_data['transaction_data'] = tx_data[:64].hex() + else: + errors.append("Failed to parse 1394 transaction header") + + return DecodedPayload( + data_type=0x58, + data_type_name="IEEE 1394 Transaction", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'IEEE1394Decoder'} + ) + + def _decode_physical_layer(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode IEEE 1394 Physical Layer data""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse physical layer data + if data_start < len(payload): + phy_data = payload[data_start:] + decoded_data['phy_data_length'] = len(phy_data) + decoded_data['phy_data_hex'] = phy_data[:64].hex() + + # Basic PHY packet analysis + if len(phy_data) >= 4: + phy_header = struct.unpack('> 28) & 0x0F + decoded_data['phy_speed'] = (phy_header >> 26) & 0x03 + + speed_names = {0: "100 Mbps", 1: "200 Mbps", 2: "400 Mbps", 3: "Reserved"} + decoded_data['phy_speed_description'] = speed_names.get( + decoded_data['phy_speed'], "Unknown" + ) + + return DecodedPayload( + data_type=0x59, + data_type_name="IEEE 1394 Physical Layer", + format_version=1, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'IEEE1394Decoder'} + ) \ No newline at end of file diff --git a/analyzer/protocols/decoders/image.py b/analyzer/protocols/decoders/image.py new file mode 100644 index 0000000..76715ca --- /dev/null +++ b/analyzer/protocols/decoders/image.py @@ -0,0 +1,186 @@ +""" +Image Data decoders for Chapter 10 data types +Supports Image Data Formats 2-7 (0x4A-0x4F) +""" + +import struct +from typing import Dict, Any, Optional, Tuple +from .base import DataTypeDecoder, DecodedPayload + + +class ImageDecoder(DataTypeDecoder): + """Decoder for Image Data types (0x4A-0x4F)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x4A + self.data_type_name = "Image Data" + self.supported_formats = list(range(0x4A, 0x50)) + + def can_decode(self, data_type: int) -> bool: + return 0x4A <= data_type <= 0x4F + + def get_data_type_name(self, data_type: int) -> str: + format_names = { + 0x4A: "Image Data Format 2 (Dynamic Imagery)", + 0x4B: "Image Data Format 3", + 0x4C: "Image Data Format 4", + 0x4D: "Image Data Format 5", + 0x4E: "Image Data Format 6", + 0x4F: "Image Data Format 7" + } + return format_names.get(data_type, f"Image Data Format {data_type & 0x0F}") + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode Image Data payload""" + data_type = ch10_header.get('data_type', 0) + + if not self.can_decode(data_type): + return None + + if data_type == 0x4A: + return self._decode_dynamic_imagery(payload, ch10_header) + else: + return self._decode_generic_image(payload, ch10_header) + + def _decode_dynamic_imagery(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode Dynamic Imagery (Format 2)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse image header + if data_start + 32 <= len(payload): + img_header = self._safe_unpack(' DecodedPayload: + """Decode generic image data (Formats 3-7)""" + data_type = ch10_header.get('data_type', 0) + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Generic image data parsing + if data_start < len(payload): + image_data = payload[data_start:] + decoded_data['image_data_length'] = len(image_data) + decoded_data['image_data_hash'] = hash(image_data) & 0xFFFFFFFF + decoded_data['header_bytes'] = image_data[:32].hex() if len(image_data) >= 32 else image_data.hex() + + # Try to identify image format from magic bytes + format_info = self._identify_image_format(image_data) + decoded_data.update(format_info) + + return DecodedPayload( + data_type=data_type, + data_type_name=self.get_data_type_name(data_type), + format_version=data_type & 0x0F, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'ImageDecoder'} + ) + + def _decode_image_format(self, format_code: int) -> str: + """Decode image format code""" + formats = { + 0: "Monochrome", + 1: "RGB", + 2: "YUV 4:2:2", + 3: "YUV 4:2:0", + 4: "RGBA", + 5: "Bayer Pattern" + } + return formats.get(format_code, f"Unknown ({format_code})") + + def _decode_compression(self, compression_code: int) -> str: + """Decode compression type""" + compressions = { + 0: "Uncompressed", + 1: "JPEG", + 2: "H.264", + 3: "MPEG-2", + 4: "PNG", + 5: "Lossless" + } + return compressions.get(compression_code, f"Unknown ({compression_code})") + + def _identify_image_format(self, data: bytes) -> Dict[str, Any]: + """Identify image format from magic bytes""" + if len(data) < 8: + return {'detected_format': 'Unknown (insufficient data)'} + + # Check common image formats + if data[:2] == b'\xFF\xD8': + return {'detected_format': 'JPEG', 'magic_bytes': data[:4].hex()} + elif data[:8] == b'\x89PNG\r\n\x1a\n': + return {'detected_format': 'PNG', 'magic_bytes': data[:8].hex()} + elif data[:4] in [b'RIFF', b'AVI ']: + return {'detected_format': 'AVI/RIFF', 'magic_bytes': data[:4].hex()} + elif data[:4] == b'\x00\x00\x00\x20' or data[:4] == b'\x00\x00\x00\x18': + return {'detected_format': 'AVIF/HEIF', 'magic_bytes': data[:4].hex()} + else: + return {'detected_format': 'Unknown/Raw', 'magic_bytes': data[:8].hex()} \ No newline at end of file diff --git a/analyzer/protocols/decoders/parallel.py b/analyzer/protocols/decoders/parallel.py new file mode 100644 index 0000000..be4e68f --- /dev/null +++ b/analyzer/protocols/decoders/parallel.py @@ -0,0 +1,83 @@ +""" +Parallel Data decoder for Chapter 10 data types +Supports Parallel Data Format 0 (0x60) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class ParallelDecoder(DataTypeDecoder): + """Decoder for Parallel Data type (0x60)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x60 + self.data_type_name = "Parallel Data" + self.supported_formats = [0x60] + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x60 + + def get_data_type_name(self, data_type: int) -> str: + return "Parallel Data Format 0" + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode Parallel Data payload""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse parallel data header + if data_start + 8 <= len(payload): + par_header = self._safe_unpack('= 4: + # Sample first few words + words = [] + for i in range(0, min(len(par_data), 32), 4): + if i + 4 <= len(par_data): + word = struct.unpack(' Optional[DataTypeDecoder]: + """Get decoder for specific data type""" + return self._decoders.get(data_type) + + def decode_payload(self, payload: bytes, ch10_header: Dict[str, any]) -> Optional[DecodedPayload]: + """Decode payload using appropriate decoder""" + data_type = ch10_header.get('data_type', 0) + decoder = self.get_decoder(data_type) + + if decoder: + return decoder.decode(payload, ch10_header) + + # Return basic decoded payload if no specific decoder found + return DecodedPayload( + data_type=data_type, + data_type_name=f"Unknown (0x{data_type:02X})", + format_version=data_type & 0x0F, + decoded_data={'raw_data': payload.hex()}, + raw_payload=payload, + errors=[f"No decoder available for data type 0x{data_type:02X}"], + metadata={'decoder': 'fallback'} + ) + + def list_supported_types(self) -> List[int]: + """List all supported data types""" + return sorted(self._decoders.keys()) + + def get_decoder_info(self) -> Dict[int, str]: + """Get information about registered decoders""" + return { + data_type: decoder.__class__.__name__ + for data_type, decoder in self._decoders.items() + } + + def _register_default_decoders(self): + """Register default decoders for known data types""" + from .tspi_cts import TSPICTSDecoder, ACTTSDecoder, GPSNMEADecoder, EAGACMIDecoder + from .image import ImageDecoder + from .uart import UARTDecoder + from .ieee1394 import IEEE1394Decoder + from .parallel import ParallelDecoder + from .ethernet import EthernetDecoder + from .can_bus import CANBusDecoder + from .fibre_channel import FibreChannelDecoder + from .custom_timing import CustomTimingDecoder + + # Register custom timing decoder for proprietary formats + custom_timing_decoder = CustomTimingDecoder() + # Register for all 0x72xx-0x78xx variants + custom_timing_types = [] + for base in range(0x72, 0x79): + for variant in range(0x00, 0x100): + custom_timing_types.append((base << 8) | variant) + self.register_decoder(custom_timing_decoder, custom_timing_types) + + # Register TSPI/CTS decoders + tspi_decoder = TSPICTSDecoder() + self.register_decoder(tspi_decoder, list(range(0x70, 0x78))) + + # Register specific TSPI/CTS decoders for better handling + self.register_decoder(GPSNMEADecoder(), [0x70]) + self.register_decoder(EAGACMIDecoder(), [0x71]) + self.register_decoder(ACTTSDecoder(), [0x72]) + + # Register Image decoders + image_decoder = ImageDecoder() + self.register_decoder(image_decoder, list(range(0x4A, 0x50))) + + # Register other decoders + self.register_decoder(UARTDecoder(), [0x50]) + + ieee1394_decoder = IEEE1394Decoder() + self.register_decoder(ieee1394_decoder, [0x58, 0x59]) + + self.register_decoder(ParallelDecoder(), [0x60]) + + ethernet_decoder = EthernetDecoder() + self.register_decoder(ethernet_decoder, [0x68, 0x69]) + + self.register_decoder(CANBusDecoder(), [0x78]) + self.register_decoder(FibreChannelDecoder(), [0x79]) + + # Register decoder classes for factory pattern + self._decoder_classes.update({ + 'TSPICTSDecoder': TSPICTSDecoder, + 'ACTTSDecoder': ACTTSDecoder, + 'GPSNMEADecoder': GPSNMEADecoder, + 'EAGACMIDecoder': EAGACMIDecoder, + 'ImageDecoder': ImageDecoder, + 'UARTDecoder': UARTDecoder, + 'IEEE1394Decoder': IEEE1394Decoder, + 'ParallelDecoder': ParallelDecoder, + 'EthernetDecoder': EthernetDecoder, + 'CANBusDecoder': CANBusDecoder, + 'FibreChannelDecoder': FibreChannelDecoder, + 'CustomTimingDecoder': CustomTimingDecoder + }) + + +# Global decoder registry instance +decoder_registry = DecoderRegistry() \ No newline at end of file diff --git a/analyzer/protocols/decoders/tspi_cts.py b/analyzer/protocols/decoders/tspi_cts.py new file mode 100644 index 0000000..ef7908f --- /dev/null +++ b/analyzer/protocols/decoders/tspi_cts.py @@ -0,0 +1,315 @@ +""" +TSPI/CTS (Time Space Position Information/Common Test System) data decoders +Supports ACTTS, GPS NMEA-RTCM, and EAG ACMI formats +""" + +import struct +from typing import Dict, Any, Optional, List +from .base import DataTypeDecoder, DecodedPayload + + +class TSPICTSDecoder(DataTypeDecoder): + """Base decoder for TSPI/CTS data types (0x70-0x77)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x70 + self.data_type_name = "TSPI/CTS" + self.supported_formats = list(range(0x70, 0x78)) + + def can_decode(self, data_type: int) -> bool: + return 0x70 <= data_type <= 0x77 + + def get_data_type_name(self, data_type: int) -> str: + format_names = { + 0x70: "GPS NMEA-RTCM", + 0x71: "EAG ACMI", + 0x72: "ACTTS", + 0x73: "TSPI/CTS Format 3", + 0x74: "TSPI/CTS Format 4", + 0x75: "TSPI/CTS Format 5", + 0x76: "TSPI/CTS Format 6", + 0x77: "TSPI/CTS Format 7" + } + return format_names.get(data_type, f"TSPI/CTS Format {data_type & 0x0F}") + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode TSPI/CTS payload""" + data_type = ch10_header.get('data_type', 0) + + if not self.can_decode(data_type): + return None + + # Parse based on specific format + if data_type == 0x70: + return self._decode_gps_nmea(payload, ch10_header) + elif data_type == 0x71: + return self._decode_eag_acmi(payload, ch10_header) + elif data_type == 0x72: + return self._decode_actts(payload, ch10_header) + else: + return self._decode_generic_tspi(payload, ch10_header) + + def _decode_gps_nmea(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode GPS NMEA-RTCM data (Format 0)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # NMEA messages are typically ASCII text + if data_start < len(payload): + nmea_data = payload[data_start:] + + # Try to decode as ASCII + try: + nmea_text = nmea_data.decode('ascii').strip() + decoded_data['nmea_messages'] = nmea_text.split('\n') + decoded_data['message_count'] = len(decoded_data['nmea_messages']) + + # Parse individual NMEA sentences + parsed_sentences = [] + for sentence in decoded_data['nmea_messages']: + if sentence.startswith('$'): + parsed_sentences.append(self._parse_nmea_sentence(sentence)) + + decoded_data['parsed_sentences'] = parsed_sentences + + except UnicodeDecodeError: + decoded_data['raw_nmea_data'] = nmea_data.hex() + errors.append("Failed to decode NMEA data as ASCII") + + return DecodedPayload( + data_type=0x70, + data_type_name="GPS NMEA-RTCM", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'TSPICTSDecoder'} + ) + + def _decode_eag_acmi(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: + """Decode EAG ACMI data (Format 1)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # EAG ACMI format parsing + if data_start + 16 <= len(payload): + # ACMI header structure (simplified) + acmi_header = self._safe_unpack(' DecodedPayload: + """Decode ACTTS (Advanced Common Time & Test System) data (Format 2)""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # ACTTS timing format + if data_start + 24 <= len(payload): + # ACTTS header structure + actts_data = self._safe_unpack(' DecodedPayload: + """Decode generic TSPI/CTS data (Formats 3-7)""" + data_type = ch10_header.get('data_type', 0) + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Generic parsing for unknown formats + if data_start < len(payload): + remaining_data = payload[data_start:] + decoded_data['raw_data'] = remaining_data.hex() + decoded_data['data_length'] = len(remaining_data) + + # Try to identify patterns + if len(remaining_data) >= 4: + # Check for timing patterns + potential_timestamps = [] + for i in range(0, min(len(remaining_data) - 4, 64), 4): + timestamp = struct.unpack(' Dict[str, Any]: + """Parse individual NMEA sentence""" + if not sentence.startswith('$') or '*' not in sentence: + return {'raw': sentence, 'valid': False} + + # Split sentence and checksum + parts = sentence.split('*') + if len(parts) != 2: + return {'raw': sentence, 'valid': False} + + data_part = parts[0][1:] # Remove '$' + checksum = parts[1] + + # Parse data fields + fields = data_part.split(',') + sentence_type = fields[0] if fields else '' + + return { + 'raw': sentence, + 'sentence_type': sentence_type, + 'fields': fields, + 'checksum': checksum, + 'valid': True + } + + def _decode_time_format(self, time_format: int) -> str: + """Decode ACTTS time format field""" + formats = { + 0: "UTC", + 1: "GPS Time", + 2: "Local Time", + 3: "Mission Time", + 4: "Relative Time" + } + return formats.get(time_format, f"Unknown ({time_format})") + + def _decode_sync_status(self, sync_status: int) -> str: + """Decode ACTTS synchronization status""" + status_bits = { + 0x01: "Time Valid", + 0x02: "Sync Locked", + 0x04: "External Reference", + 0x08: "High Accuracy", + 0x10: "Leap Second Pending" + } + + active_flags = [] + for bit, description in status_bits.items(): + if sync_status & bit: + active_flags.append(description) + + return ", ".join(active_flags) if active_flags else "No Status" + + +# Specific decoder instances +class ACTTSDecoder(TSPICTSDecoder): + """Dedicated ACTTS decoder""" + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x72 + + +class GPSNMEADecoder(TSPICTSDecoder): + """Dedicated GPS NMEA decoder""" + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x70 + + +class EAGACMIDecoder(TSPICTSDecoder): + """Dedicated EAG ACMI decoder""" + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x71 \ No newline at end of file diff --git a/analyzer/protocols/decoders/uart.py b/analyzer/protocols/decoders/uart.py new file mode 100644 index 0000000..5429f1a --- /dev/null +++ b/analyzer/protocols/decoders/uart.py @@ -0,0 +1,69 @@ +""" +UART Data decoder for Chapter 10 data types +Supports UART Data Format 0 (0x50) +""" + +import struct +from typing import Dict, Any, Optional +from .base import DataTypeDecoder, DecodedPayload + + +class UARTDecoder(DataTypeDecoder): + """Decoder for UART Data type (0x50)""" + + def __init__(self): + super().__init__() + self.data_type_base = 0x50 + self.data_type_name = "UART Data" + self.supported_formats = [0x50] + + def can_decode(self, data_type: int) -> bool: + return data_type == 0x50 + + def get_data_type_name(self, data_type: int) -> str: + return "UART Data Format 0" + + def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: + """Decode UART payload""" + decoded_data = {} + errors = [] + + # Parse IPH + iph = self._parse_intra_packet_header(payload) + if iph: + decoded_data.update(iph) + data_start = iph['data_start'] + else: + data_start = 0 + errors.append("Failed to parse intra-packet header") + + # Parse UART data + if data_start < len(payload): + uart_data = payload[data_start:] + decoded_data['uart_data_length'] = len(uart_data) + + # Try to decode as text + try: + text_data = uart_data.decode('ascii', errors='ignore') + decoded_data['ascii_data'] = text_data + decoded_data['printable_chars'] = sum(1 for c in text_data if c.isprintable()) + except: + decoded_data['ascii_decode_failed'] = True + + # Raw hex representation + decoded_data['raw_hex'] = uart_data[:256].hex() # First 256 bytes + + # Basic statistics + decoded_data['null_count'] = uart_data.count(0) + decoded_data['cr_count'] = uart_data.count(ord('\r')) + decoded_data['lf_count'] = uart_data.count(ord('\n')) + + return DecodedPayload( + data_type=0x50, + data_type_name="UART Data Format 0", + format_version=0, + decoded_data=decoded_data, + raw_payload=payload, + errors=errors, + metadata={'decoder': 'UARTDecoder'} + ) \ No newline at end of file diff --git a/analyzer/tui/textual/app_v2.py b/analyzer/tui/textual/app_v2.py index 57023e9..3239342 100644 --- a/analyzer/tui/textual/app_v2.py +++ b/analyzer/tui/textual/app_v2.py @@ -8,6 +8,7 @@ from textual.containers import Container, Horizontal, Vertical, ScrollableContai from textual.widgets import Header, Footer, Static, DataTable, Label from textual.reactive import reactive from textual.timer import Timer +from textual.events import MouseDown, MouseMove from typing import TYPE_CHECKING from rich.text import Text from rich.console import Group @@ -37,6 +38,8 @@ class StreamLensAppV2(App): """ CSS_PATH = "styles/streamlens_v2.tcss" + ENABLE_COMMAND_PALETTE = False + AUTO_FOCUS = None BINDINGS = [ ("q", "quit", "Quit"), @@ -79,40 +82,13 @@ class StreamLensAppV2(App): yield Header() with Container(id="main-container"): - # Top metrics bar - compact like TipTop + # Ultra-compact metrics bar with Horizontal(id="metrics-bar"): - yield MetricCard( - "Flows", - f"{self.total_flows}", - trend="stable", - id="flows-metric" - ) - yield MetricCard( - "Packets/s", - f"{self.packets_per_sec:.1f}", - trend="up", - sparkline=True, - id="packets-metric" - ) - yield MetricCard( - "Volume/s", - self._format_bytes_per_sec(self.bytes_per_sec), - trend="stable", - sparkline=True, - id="volume-metric" - ) - yield MetricCard( - "Enhanced", - f"{self.enhanced_flows}", - color="success", - id="enhanced-metric" - ) - yield MetricCard( - "Outliers", - f"{self.outlier_count}", - color="warning" if self.outlier_count > 0 else "normal", - id="outliers-metric" - ) + yield MetricCard("Flows", f"{self.total_flows}", id="flows-metric") + yield MetricCard("Pkts/s", f"{self.packets_per_sec:.0f}", id="packets-metric") + yield MetricCard("Vol/s", self._format_bytes_per_sec(self.bytes_per_sec), id="volume-metric") + yield MetricCard("Enhanced", f"{self.enhanced_flows}", color="success", id="enhanced-metric") + yield MetricCard("Outliers", f"{self.outlier_count}", color="warning" if self.outlier_count > 0 else "normal", id="outliers-metric") # Main content area with horizontal split with Horizontal(id="content-area"): @@ -132,7 +108,7 @@ class StreamLensAppV2(App): yield Footer() def on_mount(self) -> None: - """Initialize the application with TipTop-style updates""" + """Initialize the application with TipTop-style updates""" self.update_metrics() # Set up update intervals like TipTop @@ -141,7 +117,20 @@ class StreamLensAppV2(App): # Initialize sparkline history self._initialize_history() + + # Set initial focus to the flow table for immediate keyboard navigation + self.call_after_refresh(self._set_initial_focus) + def _set_initial_focus(self): + """Set initial focus to the flow table after widgets are ready""" + try: + flow_table = self.query_one("#flow-table", EnhancedFlowTable) + data_table = flow_table.query_one("#flows-data-table", DataTable) + data_table.focus() + except Exception: + # If table isn't ready yet, try again after a short delay + self.set_timer(0.1, self._set_initial_focus) + def _initialize_history(self): """Initialize metrics history arrays""" current_time = time.time() @@ -281,4 +270,12 @@ class StreamLensAppV2(App): def action_show_details(self) -> None: """Show detailed view for selected flow""" # TODO: Implement detailed flow modal - pass \ No newline at end of file + pass + + def on_mouse_down(self, event: MouseDown) -> None: + """Prevent default mouse down behavior to disable mouse interaction.""" + event.prevent_default() + + def on_mouse_move(self, event: MouseMove) -> None: + """Prevent default mouse move behavior to disable mouse interaction.""" + event.prevent_default() \ No newline at end of file diff --git a/analyzer/tui/textual/styles/streamlens_v2.tcss b/analyzer/tui/textual/styles/streamlens_v2.tcss index a3b61f3..74ae106 100644 --- a/analyzer/tui/textual/styles/streamlens_v2.tcss +++ b/analyzer/tui/textual/styles/streamlens_v2.tcss @@ -1,88 +1,72 @@ -/* StreamLens V2 - TipTop-Inspired Styling */ - -/* Color Scheme - Dark theme with vibrant accents */ -$primary: #0080ff; -$primary-lighten-1: #3399ff; -$primary-lighten-2: #66b3ff; -$primary-lighten-3: #99ccff; - -$accent: #00ffcc; -$success: #00ff88; -$warning: #ffcc00; -$error: #ff3366; - -$surface: #1a1a1a; -$surface-lighten-1: #262626; -$surface-lighten-2: #333333; -$background: #0d0d0d; -$text: #ffffff; -$text-muted: #999999; +/* StreamLens V2 - Compact Styling */ /* Main Application Layout */ Screen { - background: $background; + background: #0d0d0d; } #main-container { height: 1fr; - background: $background; + background: #0d0d0d; } -/* Metrics Bar - Horizontal compact display at top */ +/* Metrics Bar - Ultra compact display at top */ #metrics-bar { - height: 7; - padding: 1; - background: $surface; - border-bottom: thick $primary; + height: 3; + padding: 0; + background: #1a1a1a; + border-bottom: solid #0080ff; align: center middle; } MetricCard { width: 1fr; - height: 5; - margin: 0 1; - max-width: 20; - border: tall $primary-lighten-2; - padding: 0 1; + height: 3; + margin: 0; + max-width: 18; + border: none; + padding: 0; align: center middle; } -/* Content Area - Three column layout */ +/* Content Area - Maximized for grid */ #content-area { height: 1fr; - padding: 1; + padding: 0; } -/* Panel Styling */ +/* Panel Styling - Minimal borders */ .panel { - border: solid $primary-lighten-3; - padding: 1; - margin: 0 1; + border: solid #99ccff; + padding: 0; + margin: 0; } .panel-wide { - border: solid $primary-lighten-3; - padding: 1; - margin: 0 1; + border: solid #99ccff; + padding: 0; + margin: 0; } .panel-header { text-align: center; text-style: bold; - color: $accent; + color: #00ffcc; margin-bottom: 1; } -/* Left Panel - Main Flow Table (expanded) */ +/* Left Panel - Main Flow Table (maximized) */ #left-panel { - width: 70%; - background: $surface; + width: 75%; + background: #1a1a1a; + padding: 0; } -/* Right Panel - Details */ +/* Right Panel - Details (compact) */ #right-panel { - width: 30%; - background: $surface; + width: 25%; + background: #1a1a1a; + padding: 0; } /* Sparkline Charts */ @@ -95,114 +79,118 @@ SparklineWidget { /* Enhanced Flow Table */ #flows-data-table { height: 1fr; - scrollbar-background: $surface-lighten-1; - scrollbar-color: $primary; + scrollbar-background: #262626; + scrollbar-color: #0080ff; scrollbar-size: 1 1; } #flows-data-table > .datatable--header { - background: $surface-lighten-2; - color: $accent; + background: #333333; + color: #00ffcc; text-style: bold; } #flows-data-table > .datatable--cursor { - background: $primary 30%; - color: $text; + background: #0080ff 30%; + color: #ffffff; } #flows-data-table > .datatable--hover { - background: $primary 20%; + background: #0080ff 20%; } #flows-data-table > .datatable--odd-row { - background: $surface; + background: #1a1a1a; } #flows-data-table > .datatable--even-row { - background: $surface-lighten-1; + background: #262626; } -/* Flow Details Panel */ +/* Flow Details Panel - Compact */ FlowDetailsPanel { - padding: 1; + padding: 0; } FlowDetailsPanel Panel { - margin-bottom: 1; + margin-bottom: 0; } /* Status Colors */ .status-normal { - color: $success; + color: #00ff88; } .status-warning { - color: $warning; + color: #ffcc00; } .status-error { - color: $error; + color: #ff3366; } .status-enhanced { - color: $accent; + color: #00ffcc; text-style: bold; } /* Quality Indicators */ .quality-high { - color: $success; + color: #00ff88; } .quality-medium { - color: $warning; + color: #ffcc00; } .quality-low { - color: $error; + color: #ff3366; } /* Animations and Transitions */ .updating { - background: $primary 10%; + background: #0080ff 10%; transition: background 200ms; } -/* Header and Footer */ +/* Header and Footer - Ultra compact */ Header { - background: $surface; - color: $text; - border-bottom: solid $primary; + background: #1a1a1a; + color: #ffffff; + border-bottom: solid #0080ff; + height: 1; + padding: 0; } Footer { - background: $surface; - color: $text-muted; - border-top: solid $primary; + background: #1a1a1a; + color: #999999; + border-top: solid #0080ff; + height: 1; + padding: 0; } /* Scrollbars */ Vertical { scrollbar-size: 1 1; - scrollbar-background: $surface-lighten-1; - scrollbar-color: $primary; + scrollbar-background: #262626; + scrollbar-color: #0080ff; } Horizontal { scrollbar-size: 1 1; - scrollbar-background: $surface-lighten-1; - scrollbar-color: $primary; + scrollbar-background: #262626; + scrollbar-color: #0080ff; } /* Focus States */ DataTable:focus { - border: solid $accent; + border: solid #00ffcc; } /* Panel Borders */ Static { - border: round $primary; + border: round #0080ff; } /* End of styles */ \ No newline at end of file diff --git a/analyzer/tui/textual/widgets/flow_table.py b/analyzer/tui/textual/widgets/flow_table.py index 44f17f4..ef5efdf 100644 --- a/analyzer/tui/textual/widgets/flow_table.py +++ b/analyzer/tui/textual/widgets/flow_table.py @@ -69,9 +69,11 @@ class FlowAnalysisWidget(Vertical): if not self.flow_table: return - # Preserve cursor position + # Preserve cursor and scroll positions cursor_row = self.flow_table.cursor_row cursor_column = self.flow_table.cursor_column + scroll_x = self.flow_table.scroll_x + scroll_y = self.flow_table.scroll_y selected_row_key = None if self.flow_table.rows and cursor_row < len(self.flow_table.rows): selected_row_key = list(self.flow_table.rows.keys())[cursor_row] @@ -108,6 +110,9 @@ class FlowAnalysisWidget(Vertical): # If original selection not found, try to maintain row position new_row = min(cursor_row, self.flow_table.row_count - 1) self.flow_table.move_cursor(row=new_row, column=cursor_column, animate=False) + + # Restore scroll position + self.flow_table.scroll_to(x=scroll_x, y=scroll_y, animate=False) def _create_flow_row(self, flow_num: int, flow: 'FlowStats') -> List[Text]: """Create main flow row with rich text formatting""" diff --git a/analyzer/tui/textual/widgets/flow_table_v2.py b/analyzer/tui/textual/widgets/flow_table_v2.py index bab2d38..d38efdb 100644 --- a/analyzer/tui/textual/widgets/flow_table_v2.py +++ b/analyzer/tui/textual/widgets/flow_table_v2.py @@ -29,11 +29,15 @@ class EnhancedFlowTable(Vertical): DEFAULT_CSS = """ EnhancedFlowTable { height: 1fr; + padding: 0; + margin: 0; } EnhancedFlowTable DataTable { height: 1fr; scrollbar-gutter: stable; + padding: 0; + margin: 0; } """ @@ -62,17 +66,15 @@ class EnhancedFlowTable(Vertical): """Initialize the table""" table = self.query_one("#flows-data-table", DataTable) - # Add columns with explicit keys to avoid auto-generated keys - table.add_column("#", width=3, key="num") - table.add_column("Source", width=22, key="source") - table.add_column("Proto", width=6, key="proto") - table.add_column("Destination", width=22, key="dest") - table.add_column("Extended", width=10, key="extended") - table.add_column("Frame Type", width=12, key="frame_type") - table.add_column("Rate", width=12, key="rate") - table.add_column("Volume", width=12, key="volume") - table.add_column("Quality", width=12, key="quality") - table.add_column("Status", width=8, key="status") + # Compact columns optimized for data density + table.add_column("#", width=2, key="num") + table.add_column("Source", width=18, key="source") + table.add_column("Proto", width=4, key="proto") + table.add_column("Destination", width=18, key="dest") + table.add_column("Extended", width=8, key="extended") + table.add_column("Frame Type", width=10, key="frame_type") + table.add_column("Pkts", width=6, key="rate") + table.add_column("Size", width=8, key="volume") self.refresh_data() @@ -80,9 +82,11 @@ class EnhancedFlowTable(Vertical): """Refresh flow table with enhanced visualizations""" table = self.query_one("#flows-data-table", DataTable) - # Preserve cursor position + # Preserve cursor and scroll positions cursor_row = table.cursor_row cursor_column = table.cursor_column + scroll_x = table.scroll_x + scroll_y = table.scroll_y selected_row_key = None if table.rows and cursor_row < len(table.rows): selected_row_key = list(table.rows.keys())[cursor_row] @@ -148,6 +152,9 @@ class EnhancedFlowTable(Vertical): # If original selection not found, try to maintain row position new_row = min(cursor_row, table.row_count - 1) table.move_cursor(row=new_row, column=cursor_column, animate=False) + + # Restore scroll position + table.scroll_to(x=scroll_x, y=scroll_y, animate=False) def _create_enhanced_row(self, num: int, flow: 'FlowStats', metrics: dict) -> List[Text]: """Create enhanced row with inline visualizations""" @@ -177,10 +184,9 @@ class EnhancedFlowTable(Vertical): rate_spark = self._create_rate_sparkline(metrics['rate_history']) rate_text = Text(f"{metrics['rate_history'][-1]:.0f} {rate_spark}") - # Volume with bar chart - volume_bar = self._create_volume_bar(flow.total_bytes) - volume_value = self._format_bytes(flow.total_bytes) - volume_text = Text(f"{volume_value:>6} {volume_bar}") + # Size with actual value + size_value = self._format_bytes(flow.total_bytes) + size_text = Text(f"{size_value:>8}") # Quality with bar chart and color quality_bar, quality_color = self._create_quality_bar(flow) @@ -199,8 +205,7 @@ class EnhancedFlowTable(Vertical): return [ num_text, source_text, proto_text, dest_text, - extended_text, frame_text, rate_text, volume_text, - quality_text, status_text + extended_text, frame_text, rate_text, size_text ] def _create_rate_sparkline(self, history: List[float]) -> str: @@ -308,12 +313,10 @@ class EnhancedFlowTable(Vertical): Text(""), # Empty source Text(""), # Empty protocol Text(""), # Empty destination - Text(f" └─ {extended_proto}", style="dim yellow"), + Text(f" {extended_proto}", style="dim yellow"), Text(frame_type, style="dim blue"), Text(f"{count}", style="dim", justify="right"), - Text(f"{percentage:.0f}%", style="dim"), - Text(""), # Empty quality - Text("") # Empty status + Text(f"{self._format_bytes(count * (flow.total_bytes // flow.frame_count) if flow.frame_count > 0 else 0):>8}", style="dim") ] subrows.append(subrow) diff --git a/analyzer/tui/textual/widgets/metric_card.py b/analyzer/tui/textual/widgets/metric_card.py index dac2050..69dbcbc 100644 --- a/analyzer/tui/textual/widgets/metric_card.py +++ b/analyzer/tui/textual/widgets/metric_card.py @@ -29,19 +29,23 @@ class MetricCard(Widget): MetricCard { width: 1fr; height: 3; - margin: 0 1; + margin: 0; + padding: 0; } MetricCard.success { - border: solid $success; + border: none; + color: #00ff88; } MetricCard.warning { - border: solid $warning; + border: none; + color: #ffcc00; } MetricCard.error { - border: solid $error; + border: none; + color: #ff3366; } """ @@ -106,18 +110,15 @@ class MetricCard(Widget): if self.sparkline and self.spark_data: spark_str = " " + self._create_mini_spark() - # Format content + # Ultra compact - single line format content = Text() - content.append(f"{self.title}\n", style="dim") + content.append(f"{self.title}: ", style="dim") content.append(f"{self.value}", style=f"bold {style}") content.append(trend_icon, style=style) content.append(spark_str, style="dim cyan") - return Panel( - content, - height=3, - border_style=style if self.color != "normal" else "dim" - ) + # Super compact - no panel, just text + return content def _create_mini_spark(self) -> str: """Create mini sparkline for inline display""" diff --git a/docs/specifications/irig106/chapter10.pdf b/docs/specifications/irig106/chapter10.pdf new file mode 100644 index 0000000..533e284 Binary files /dev/null and b/docs/specifications/irig106/chapter10.pdf differ