#!/usr/bin/env python3 """ TMATS Packet class for IRIG106 Chapter 10 TMATS frame parsing """ import struct from typing import Dict, Optional, List try: from scapy.layers.inet import IP, UDP except ImportError: print("Error: scapy library not found. Install with: pip install scapy") exit(1) class TMATSPacket: """Represents an IRIG106 Chapter 10 TMATS packet""" def __init__(self, packet, original_frame_num: Optional[int] = None): """ Initialize TMATS packet from raw scapy packet Args: packet: Raw scapy packet original_frame_num: Original frame number in PCAP file """ self.raw_packet = packet self.original_frame_num: Optional[int] = original_frame_num # Extract basic packet info self.timestamp = float(packet.time) self.packet_size = len(packet) # Extract IP/UDP info if available if packet.haslayer(IP) and packet.haslayer(UDP): ip_layer = packet[IP] udp_layer = packet[UDP] self.src_ip = ip_layer.src self.dst_ip = ip_layer.dst self.src_port = udp_layer.sport self.dst_port = udp_layer.dport self.payload = bytes(udp_layer.payload) else: self.src_ip = "" self.dst_ip = "" self.src_port = 0 self.dst_port = 0 self.payload = bytes() # Parse TMATS content self.tmats_info = self._parse_tmats_content() self.is_tmats = self.tmats_info is not None def _parse_tmats_content(self) -> Optional[Dict]: """Parse TMATS content from payload""" if len(self.payload) < 12: return None try: # Look for Chapter 10 sync pattern ch10_offset = None for offset in range(min(16, len(self.payload) - 24)): if len(self.payload) >= offset + 2: sync_pattern = struct.unpack('= len(self.payload): return None # Extract TMATS ASCII data tmats_data = self.payload[data_start:] # Try to decode as ASCII try: ascii_content = tmats_data.decode('ascii', errors='ignore') # Check if this looks like TMATS data tmats_patterns = ['\\', 'R-1\\', 'G\\', 'T-', 'P-', 'COMMENT:', 'DSI', 'DST'] if any(pattern in ascii_content for pattern in tmats_patterns): return { 'raw_data': tmats_data, 'ascii_content': ascii_content, 'data_start_offset': data_start, 'data_length': len(tmats_data), 'has_ch10_header': ch10_offset is not None, 'is_continuation': ch10_offset is None } except Exception: pass return None except (struct.error, IndexError): return None def _parse_ch10_header(self, offset: int) -> None: """Parse Chapter 10 header if present""" try: base = offset sync_pattern = struct.unpack(' Optional[int]: """Find where ASCII TMATS data starts in continuation frames""" # Look for start of ASCII data by scanning for printable characters for i in range(min(32, len(self.payload))): # Check if we have a sequence of printable ASCII characters if i + 10 < len(self.payload): sample = self.payload[i:i+10] try: decoded = sample.decode('ascii') # If we can decode it and it contains TMATS-like characters if any(c in decoded for c in ['R', 'G', 'T', 'P', '\\', '-', ':']): return i except: continue # Fallback - assume data starts after a simple header (based on observation) return 12 if len(self.payload) > 12 else None def get_ascii_content(self) -> str: """Get the ASCII content of the TMATS data""" if self.tmats_info: return self.tmats_info['ascii_content'] return "" def is_continuation_frame(self) -> bool: """Check if this is a TMATS continuation frame (without Ch10 header)""" if self.tmats_info: return self.tmats_info['is_continuation'] return False def has_chapter10_header(self) -> bool: """Check if this frame has a full Chapter 10 header""" if self.tmats_info: return self.tmats_info['has_ch10_header'] return False class TMATSAssembler: """Assembles TMATS data from multiple frames""" def __init__(self): self.tmats_frames: List[TMATSPacket] = [] self.assembled_content = "" self.tmats_files: List[str] = [] def add_frame(self, tmats_packet: TMATSPacket) -> None: """Add a TMATS frame to the assembler""" if tmats_packet.is_tmats: self.tmats_frames.append(tmats_packet) def assemble(self) -> str: """Assemble TMATS frames into complete TMATS files, stopping at END markers""" if not self.tmats_frames: return "" # Sort frames by timestamp to ensure correct order sorted_frames = sorted(self.tmats_frames, key=lambda x: x.timestamp) # Assemble TMATS content, detecting file boundaries current_tmats = [] self.tmats_files = [] for frame in sorted_frames: content = frame.get_ascii_content() if not content: continue current_tmats.append(content) # Check if this frame contains a TMATS END marker if self._contains_tmats_end(content): # Complete TMATS file found complete_tmats = ''.join(current_tmats) self.tmats_files.append(complete_tmats) current_tmats = [] # Start new TMATS file # Handle any remaining partial TMATS content if current_tmats: partial_tmats = ''.join(current_tmats) self.tmats_files.append(partial_tmats) # Return the first complete TMATS file, or all if multiple unique files if self.tmats_files: # Check if we have multiple unique TMATS files unique_files = self._get_unique_tmats_files() if len(unique_files) == 1: self.assembled_content = unique_files[0] else: # Multiple unique TMATS files - show all with separators self.assembled_content = self._format_multiple_tmats_files(unique_files) else: self.assembled_content = "" return self.assembled_content def _contains_tmats_end(self, content: str) -> bool: """Check if content contains a TMATS END marker""" end_patterns = [ 'TMATS END', 'TMATS_END', '-----END', 'END----' ] return any(pattern in content for pattern in end_patterns) def _get_unique_tmats_files(self) -> List[str]: """Get unique TMATS files, removing duplicates""" unique_files = [] for tmats_file in self.tmats_files: # Clean the content for comparison cleaned = self._clean_tmats_content(tmats_file) # Check if this is a duplicate of an existing file is_duplicate = False for existing_file in unique_files: existing_cleaned = self._clean_tmats_content(existing_file) if self._are_tmats_equivalent(cleaned, existing_cleaned): is_duplicate = True break if not is_duplicate and cleaned.strip(): unique_files.append(tmats_file) return unique_files def _clean_tmats_content(self, content: str) -> str: """Clean TMATS content for comparison by removing junk characters""" # Remove non-printable characters except newlines cleaned = ''.join(c if c.isprintable() or c == '\n' else '' for c in content) # Remove leading junk characters that might vary between transmissions lines = cleaned.split('\n') clean_lines = [] for line in lines: # Skip lines that are mostly junk characters if len(line.strip()) < 3: continue # Look for lines that start with valid TMATS patterns stripped = line.strip() if any(stripped.startswith(pattern) for pattern in ['G\\', 'R-', 'V-', 'T-', 'P-', 'COMMENT:']): clean_lines.append(stripped) elif any(pattern in stripped for pattern in ['TMATS END', '----']): clean_lines.append(stripped) return '\n'.join(clean_lines) def _are_tmats_equivalent(self, content1: str, content2: str) -> bool: """Check if two TMATS contents are equivalent (accounting for minor differences)""" # Simple comparison - if they're more than 80% similar, consider them equivalent lines1 = set(line.strip() for line in content1.split('\n') if line.strip()) lines2 = set(line.strip() for line in content2.split('\n') if line.strip()) if not lines1 or not lines2: return False # Calculate similarity intersection = lines1.intersection(lines2) union = lines1.union(lines2) similarity = len(intersection) / len(union) if union else 0 return similarity > 0.8 def _format_multiple_tmats_files(self, tmats_files: List[str]) -> str: """Format multiple TMATS files with separators""" if not tmats_files: return "" if len(tmats_files) == 1: return self._clean_tmats_content(tmats_files[0]) # Multiple unique files - show with separators formatted_parts = [] for i, tmats_file in enumerate(tmats_files): if i > 0: formatted_parts.append(f"\n{'='*60}\nTMATS FILE #{i+1}\n{'='*60}\n") formatted_parts.append(self._clean_tmats_content(tmats_file)) return ''.join(formatted_parts) def get_frame_count(self) -> int: """Get the number of TMATS frames""" return len(self.tmats_frames) def get_file_count(self) -> int: """Get the number of unique TMATS files found""" return len(self.tmats_files) def get_total_length(self) -> int: """Get the total length of assembled TMATS data""" return len(self.assembled_content)