""" Enhanced Chapter 10 (IRIG 106) decoder with comprehensive field extraction Exposes all CH10 frame variables for modular analysis """ import struct from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass, field from abc import ABC, abstractmethod # Import the modular framework components @dataclass class FieldDefinition: """Defines a field that can be extracted from decoded data""" name: str description: str data_type: type unit: Optional[str] = None validator: Optional[callable] = None @dataclass class StructuredFrameData: """Container for decoded frame data with metadata""" decoder_name: str packet_timestamp: float raw_data: bytes fields: Dict[str, Any] = field(default_factory=dict) metadata: Dict[str, Any] = field(default_factory=dict) def get_field(self, name: str, default=None): return self.fields.get(name, default) def has_field(self, name: str) -> bool: return name in self.fields class EnhancedChapter10Decoder: """Comprehensive Chapter 10 decoder exposing all frame variables""" # Chapter 10 sync pattern SYNC_PATTERN = 0xEB25 # Data type definitions from IRIG 106-17 DATA_TYPES = { 0x00: "Computer Generated Data", 0x01: "TMATS", 0x02: "Computer Generated Data - Format 2", 0x03: "Computer Generated Data - Format 3", 0x04: "PCM Format 1", 0x05: "Time Data - Format 1", 0x06: "Time Data - Format 2", 0x07: "Computer Generated Data - Format 4", 0x08: "PCM Format 2", 0x09: "IRIG Time", 0x0A: "Computer Generated Data - Format 5", 0x0B: "Computer Generated Data - Format 6", 0x11: "1553 Format 1", 0x19: "1553 Format 2", 0x21: "Analog Format 1", 0x29: "Discrete Format 1", 0x30: "Message Data", 0x31: "ARINC 429 Format 1", 0x38: "Video Format 0", 0x39: "Video Format 1", 0x3A: "Video Format 2", 0x40: "Image Format 0", 0x41: "Image Format 1", 0x48: "UART Format 0", 0x50: "IEEE 1394 Format 0", 0x51: "IEEE 1394 Format 1", 0x58: "Parallel Format 0", 0x59: "Parallel Format 1", 0x60: "Ethernet Format 0", 0x61: "Ethernet Format 1", 0x68: "TSPI/CTS Format 0", 0x69: "TSPI/CTS Format 1", 0x70: "CAN Bus", 0x71: "Fibre Channel Format 0", 0x72: "Analog Format 2", 0x73: "Analog Format 3", 0x74: "Analog Format 4", 0x75: "Analog Format 5", 0x76: "Analog Format 6", 0x77: "Analog Format 7", 0x78: "Analog Format 8" } # Packet flags bit definitions PACKET_FLAGS = { 0: "Secondary Header Time Source", 1: "Format Error", 2: "RTC Sync Error", 3: "IPH Time Source", 4: "Secondary Header Present", 5: "Optional Data Present", 6: "Reserved", 7: "Overflow Error" } @property def decoder_name(self) -> str: return "Chapter10_Enhanced" @property def supported_fields(self) -> List[FieldDefinition]: """All fields that can be extracted from CH10 frames""" return [ # Primary header fields (24 bytes) FieldDefinition("sync_pattern", "Sync pattern (should be 0xEB25)", int), FieldDefinition("channel_id", "Channel identifier", int), FieldDefinition("packet_length", "Total packet length including header", int, "bytes"), FieldDefinition("data_length", "Data payload length", int, "bytes"), FieldDefinition("header_version", "Header version number", int), FieldDefinition("sequence_number", "Packet sequence number", int), FieldDefinition("packet_flags", "Packet flags byte", int), FieldDefinition("data_type", "Data type identifier", int), FieldDefinition("relative_time_counter", "RTC value (6 bytes)", int, "counts"), FieldDefinition("header_checksum", "Header checksum", int), # Decoded packet flags FieldDefinition("secondary_header_time_source", "Time source from secondary header", bool), FieldDefinition("format_error", "Format error flag", bool), FieldDefinition("rtc_sync_error", "RTC synchronization error", bool), FieldDefinition("iph_time_source", "IPH time source flag", bool), FieldDefinition("secondary_header_present", "Secondary header present", bool), FieldDefinition("optional_data_present", "Optional data present", bool), FieldDefinition("overflow_error", "Data overflow error", bool), # Data type information FieldDefinition("data_type_name", "Human readable data type", str), FieldDefinition("is_analog_data", "True if analog format", bool), FieldDefinition("is_pcm_data", "True if PCM format", bool), FieldDefinition("is_tmats_data", "True if TMATS data", bool), FieldDefinition("is_time_data", "True if time format", bool), # Secondary header fields (if present) FieldDefinition("secondary_header_time", "Secondary header timestamp", int, "nanoseconds"), FieldDefinition("internal_seconds", "Internal time seconds component", int, "seconds"), FieldDefinition("internal_nanoseconds", "Internal time nanoseconds component", int, "nanoseconds"), FieldDefinition("internal_timestamp", "Combined internal timestamp", float, "seconds"), # Analog format specific fields FieldDefinition("analog_minor_frame_count", "Number of minor frames", int), FieldDefinition("analog_scan_count", "Scans per minor frame", int), FieldDefinition("analog_channel_count", "Number of analog channels", int), FieldDefinition("analog_sample_rate", "Sampling rate", float, "Hz"), FieldDefinition("analog_bit_depth", "Bits per sample", int, "bits"), FieldDefinition("analog_format_factor", "Format factor", int), FieldDefinition("analog_measurement_list", "Channel measurement data", dict), # PCM format specific fields FieldDefinition("pcm_minor_frame_sync", "Minor frame sync pattern", int), FieldDefinition("pcm_major_frame_sync", "Major frame sync pattern", int), FieldDefinition("pcm_minor_frame_length", "Minor frame length", int, "bits"), FieldDefinition("pcm_major_frame_length", "Major frame length", int, "minor_frames"), FieldDefinition("pcm_bits_per_second", "PCM bit rate", int, "bps"), # TMATS specific fields FieldDefinition("tmats_version", "TMATS version", str), FieldDefinition("tmats_channel_configs", "Parsed channel configurations", dict), FieldDefinition("tmats_data_source_id", "Data source identifier", str), FieldDefinition("tmats_recording_date", "Recording date/time", str), # General payload analysis FieldDefinition("payload_entropy", "Data randomness measure", float), FieldDefinition("payload_patterns", "Detected data patterns", list), FieldDefinition("has_embedded_timestamps", "Contains timestamp data", bool), # Frame quality metrics FieldDefinition("header_checksum_valid", "Header checksum validation", bool), FieldDefinition("frame_completeness", "Percentage of expected data present", float, "percent"), FieldDefinition("data_continuity_errors", "Number of continuity errors", int), # Timing analysis fields FieldDefinition("rtc_time_base", "RTC time base frequency", float, "Hz"), FieldDefinition("time_source_confidence", "Confidence in time source", float), FieldDefinition("clock_drift_indicators", "Indicators of clock drift", dict) ] def can_decode(self, packet, transport_info: Dict) -> float: """Check if packet contains Chapter 10 data""" if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): return 0.0 from scapy.all import Raw raw_data = bytes(packet[Raw]) # Must have at least primary header if len(raw_data) < 24: return 0.0 try: # Check for sync pattern sync = struct.unpack(' packet_length - 24: return 0.5 # Might be CH10 but malformed # Check if packet length matches actual data if packet_length <= len(raw_data): return 1.0 else: return 0.8 # Truncated but probably CH10 except (struct.error, IndexError): return 0.0 def decode_frame(self, packet, transport_info: Dict) -> Optional[StructuredFrameData]: """Comprehensive Chapter 10 frame decoding""" if not hasattr(packet, 'haslayer') or not packet.haslayer('Raw'): return None from scapy.all import Raw raw_data = bytes(packet[Raw]) if len(raw_data) < 24: return None try: frame_data = StructuredFrameData( decoder_name=self.decoder_name, packet_timestamp=float(packet.time), raw_data=raw_data ) # Parse primary header self._parse_primary_header(raw_data, frame_data) # Parse secondary header if present header_size = 24 if frame_data.get_field('secondary_header_present'): header_size += self._parse_secondary_header(raw_data[24:], frame_data) # Parse data payload based on type payload_start = header_size data_length = frame_data.get_field('data_length', 0) if payload_start < len(raw_data) and data_length > 0: payload_end = min(payload_start + data_length, len(raw_data)) payload = raw_data[payload_start:payload_end] self._parse_payload(payload, frame_data) # Calculate derived fields self._calculate_derived_fields(frame_data) # Validate frame integrity self._validate_frame(frame_data) return frame_data except Exception as e: # Return partial data even if parsing fails if 'frame_data' in locals(): frame_data.metadata['parsing_error'] = str(e) return frame_data return None def _parse_primary_header(self, raw_data: bytes, frame_data: StructuredFrameData): """Parse the 24-byte primary header""" # Unpack primary header fields header = struct.unpack(' int: """Parse secondary header (variable length, typically 8 bytes for time)""" if len(data) < 8: return 0 try: # Parse time format secondary header (most common) time_data = struct.unpack('> 16) & 0xFFFF }) # Parse measurement data if present if len(payload) > 12: self._parse_analog_measurements(payload[12:], frame_data) elif data_type in [0x73, 0x74, 0x75]: # Other analog formats # Simplified parsing for other formats if len(payload) >= 4: basic_info = struct.unpack(' 0: for channel in range(min(channel_count, 16)): # Limit to prevent excessive processing channel_data = [] for sample in range(min(samples_per_channel, 100)): # Limit samples offset = (sample * channel_count + channel) * 2 if offset + 1 < len(data): value = struct.unpack(' Dict[str, Any]: """Parse TMATS text format""" tmats_data = { 'tmats_version': 'Unknown', 'tmats_channel_configs': {}, 'tmats_data_source_id': 'Unknown', 'tmats_recording_date': 'Unknown' } try: # Split on backslashes (TMATS line separators) lines = text.split('\\') channel_configs = {} for line in lines: line = line.strip() if not line: continue # Parse key-value pairs if ':' in line: key, value = line.split(':', 1) elif ';' in line: key, value = line.split(';', 1) else: continue key = key.strip() value = value.strip().rstrip(';') # Parse specific TMATS parameters if key.startswith('G\\VER'): tmats_data['tmats_version'] = value elif key.startswith('G\\DSI'): tmats_data['tmats_data_source_id'] = value elif key.startswith('G\\RD'): tmats_data['tmats_recording_date'] = value elif key.startswith('R-'): # Channel configuration self._parse_tmats_channel_config(key, value, channel_configs) tmats_data['tmats_channel_configs'] = channel_configs except Exception: pass return tmats_data def _parse_tmats_channel_config(self, key: str, value: str, configs: Dict): """Parse TMATS channel configuration parameters""" # Extract channel ID from key like "R-1\\G" or "R-CH1\\N" parts = key.split('\\') if len(parts) < 2: return channel_part = parts[0] # e.g., "R-1" param_part = parts[1] # e.g., "G", "N", "EU" if channel_part.startswith('R-'): channel_id = channel_part[2:] if channel_id not in configs: configs[channel_id] = {} # Map parameter codes param_map = { 'G': 'gain', 'N': 'name', 'EU': 'units', 'MN': 'min_value', 'MX': 'max_value', 'OF': 'offset', 'FS': 'full_scale', 'SN': 'sample_rate' } param_name = param_map.get(param_part, param_part.lower()) # Try to convert numeric values try: if param_name in ['gain', 'min_value', 'max_value', 'offset', 'full_scale', 'sample_rate']: value = float(value) except ValueError: pass configs[channel_id][param_name] = value def _parse_time_payload(self, payload: bytes, frame_data: StructuredFrameData): """Parse time format payload""" if len(payload) < 8: return try: # Parse time data (format depends on data type) time_info = struct.unpack(' float: """Calculate Shannon entropy of data""" if len(data) == 0: return 0.0 # Count byte frequencies frequencies = [0] * 256 for byte in data: frequencies[byte] += 1 # Calculate entropy entropy = 0.0 for freq in frequencies: if freq > 0: probability = freq / len(data) entropy -= probability * (probability.bit_length() - 1) return entropy / 8.0 # Normalize to 0-1 range def _detect_patterns(self, data: bytes) -> List[str]: """Detect common patterns in data""" patterns = [] if len(data) < 4: return patterns # Check for repeated patterns if data[:4] == data[4:8]: patterns.append("repeated_4byte_pattern") # Check for incrementing patterns if len(data) >= 8: values = struct.unpack(f'<{len(data)//4}I', data[:len(data)//4*4]) if all(values[i] < values[i+1] for i in range(len(values)-1)): patterns.append("incrementing_sequence") # Check for zero padding if data.count(0) > len(data) * 0.8: patterns.append("mostly_zeros") return patterns def _calculate_derived_fields(self, frame_data: StructuredFrameData): """Calculate derived fields from extracted data""" # Calculate frame completeness expected_length = frame_data.get_field('packet_length', 0) actual_length = len(frame_data.raw_data) if expected_length > 0: completeness = min(100.0, (actual_length / expected_length) * 100.0) frame_data.fields['frame_completeness'] = completeness # Analyze timing if internal timestamp is available if frame_data.has_field('internal_timestamp'): packet_time = frame_data.packet_timestamp internal_time = frame_data.get_field('internal_timestamp') time_delta = packet_time - internal_time frame_data.fields['packet_internal_time_delta'] = time_delta # Simple confidence measure confidence = 1.0 - min(1.0, abs(time_delta) / 60.0) # Confidence decreases with time delta frame_data.fields['time_source_confidence'] = confidence def _validate_frame(self, frame_data: StructuredFrameData): """Validate frame integrity""" # Validate sync pattern sync = frame_data.get_field('sync_pattern', 0) frame_data.fields['sync_pattern_valid'] = (sync == self.SYNC_PATTERN) # Simple checksum validation (would need proper implementation) # For now, just check if checksum field exists has_checksum = frame_data.has_field('header_checksum') frame_data.fields['header_checksum_valid'] = has_checksum # Check for continuity errors (simplified) seq_num = frame_data.get_field('sequence_number', 0) frame_data.fields['data_continuity_errors'] = 0 # Would need sequence tracking # Overall frame quality score quality_factors = [ frame_data.get_field('sync_pattern_valid', False), frame_data.get_field('header_checksum_valid', False), frame_data.get_field('frame_completeness', 0) > 95.0, not frame_data.get_field('format_error', True), not frame_data.get_field('overflow_error', True) ] quality_score = sum(quality_factors) / len(quality_factors) * 100.0 frame_data.fields['frame_quality_score'] = quality_score