diff --git a/1 PTPGM.pcapng b/1 PTPGM.pcapng new file mode 100644 index 0000000..80bc023 Binary files /dev/null and b/1 PTPGM.pcapng differ diff --git a/LuaDissectors b/LuaDissectors new file mode 160000 index 0000000..59979df --- /dev/null +++ b/LuaDissectors @@ -0,0 +1 @@ +Subproject commit 59979dfd4ae3916f20ebfff1988fe040721e6e37 diff --git a/README.md b/README.md index e69de29..b05394c 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,106 @@ +# PCAP Analyzer for IRIG106 Chapter 10 and IEEE1588 PTP + +A Python tool for analyzing Wireshark PCAP files containing IRIG106 Chapter 10 streaming data and IEEE1588 PTP frames. + +## Features + +- **Chapter 10 Analysis**: Parses IRIG106 Chapter 10 headers and displays packet details in tabular format +- **PTP Analysis**: Analyzes IEEE1588 PTP messages (Sync, Announce, Delay_Req, etc.) +- **Statistical Analysis**: Provides timing statistics and detects intermittent issues such as: + - Timing outliers and jitter + - Sequence number gaps and duplicates + - Message distribution analysis + +## Installation + +1. Install dependencies: +```bash +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt +``` + +## Usage + +### Basic Analysis +```bash +python3 pcap_analyzer.py "1 PTPGM.pcapng" +``` + +### Options +- `--ch10-only`: Show only Chapter 10 analysis +- `--ptp-only`: Show only PTP analysis +- `--stats-only`: Show only statistical analysis +- `--summary-only`: Show only summary information (no detailed tables or stats) +- `--no-tables`: Skip detailed packet tables (show summaries and stats only) +- `--tmats`: Display TMATS (Telemetry Attributes Transfer Standard) content +- `--tmats-only`: Show only TMATS content + +### Examples +```bash +# Analyze only PTP packets +python3 pcap_analyzer.py --ptp-only "1 PTPGM.pcapng" + +# Show only statistics +python3 pcap_analyzer.py --stats-only "1 PTPGM.pcapng" + +# Show only summaries (quick overview) +python3 pcap_analyzer.py --summary-only "1 PTPGM.pcapng" + +# Show summaries and statistics but skip detailed tables +python3 pcap_analyzer.py --no-tables "1 PTPGM.pcapng" + +# Display TMATS metadata content +python3 pcap_analyzer.py --tmats "1 PTPGM.pcapng" + +# Show only TMATS content +python3 pcap_analyzer.py --tmats-only "1 PTPGM.pcapng" +``` + +## Output + +The tool provides five types of analysis: + +1. **Protocol Summaries**: High-level overview with packet counts, time spans, and distribution statistics +2. **Detailed Packet Tables**: Complete packet-by-packet analysis (Chapter 10 and PTP) +3. **Statistical Analysis**: Timing statistics, outlier detection, and intermittent issue identification +4. **TMATS Content**: Assembled telemetry metadata and scaling information from Chapter 10 TMATS frames + +### Summary Output +- **Chapter 10 Summary**: Packet counts, channel distribution, data type distribution, size statistics, and data rates +- **PTP Summary**: Message type distribution, domain analysis, source IP breakdown, and timing rates +- **TMATS Output**: Complete assembled ASCII metadata with frame counts and statistics + +## Chapter 10 Header Fields + +- **Sync Pattern**: Should be 0xEB25 for valid Ch10 packets +- **Channel ID**: Identifies the data source +- **Sequence Number**: Packet sequence (0-255, wraps around) +- **Data Type**: Type of data payload +- **Packet/Data Length**: Size information +- **Flags**: Status and configuration flags + +## PTP Message Types + +- **Sync**: Master clock synchronization +- **Follow_Up**: Precise timing information +- **Delay_Req**: Slave delay measurement request +- **Delay_Resp**: Master delay measurement response +- **Announce**: Clock quality and hierarchy information + +## TMATS (Telemetry Attributes Transfer Standard) + +TMATS frames contain ASCII metadata that describes the telemetry setup, channel configurations, and scaling information: + +- **Automatic Assembly**: Combines multiple TMATS frames into complete metadata +- **ASCII Display**: Clean formatting of telemetry attributes +- **Statistics**: Frame counts, total length, attribute and comment line counts +- **Mixed Frame Support**: Handles both full Chapter 10 headers and continuation frames + +## Statistical Features + +- **Timing Analysis**: Interval statistics, jitter detection +- **Sequence Analysis**: Gap detection, duplicate identification +- **Distribution Analysis**: Message type frequency +- **Outlier Detection**: Identifies packets with unusual timing +- **Frame Number Reporting**: Shows actual PCAP frame numbers for easy Wireshark correlation \ No newline at end of file diff --git a/__pycache__/chapter10_packet.cpython-313.pyc b/__pycache__/chapter10_packet.cpython-313.pyc new file mode 100644 index 0000000..6334704 Binary files /dev/null and b/__pycache__/chapter10_packet.cpython-313.pyc differ diff --git a/__pycache__/pcap_analyzer.cpython-313.pyc b/__pycache__/pcap_analyzer.cpython-313.pyc new file mode 100644 index 0000000..bda6cee Binary files /dev/null and b/__pycache__/pcap_analyzer.cpython-313.pyc differ diff --git a/__pycache__/ptp_packet.cpython-313.pyc b/__pycache__/ptp_packet.cpython-313.pyc new file mode 100644 index 0000000..33e375e Binary files /dev/null and b/__pycache__/ptp_packet.cpython-313.pyc differ diff --git a/__pycache__/tmats_packet.cpython-313.pyc b/__pycache__/tmats_packet.cpython-313.pyc new file mode 100644 index 0000000..ae5e939 Binary files /dev/null and b/__pycache__/tmats_packet.cpython-313.pyc differ diff --git a/chapter10_packet.py b/chapter10_packet.py new file mode 100644 index 0000000..8fce0d3 --- /dev/null +++ b/chapter10_packet.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +Chapter 10 Packet class for IRIG106 Chapter 10 frame parsing +""" + +import struct +from typing import Dict, Optional + +try: + from scapy.layers.inet import IP, UDP +except ImportError: + print("Error: scapy library not found. Install with: pip install scapy") + exit(1) + + +class Chapter10Packet: + """Represents an IRIG106 Chapter 10 packet""" + + def __init__(self, packet, original_frame_num: Optional[int] = None): + """ + Initialize Chapter 10 packet from raw scapy packet + + Args: + packet: Raw scapy packet + original_frame_num: Original frame number in PCAP file + """ + self.raw_packet = packet + self.original_frame_num: Optional[int] = original_frame_num + + # Extract basic packet info + self.timestamp = float(packet.time) + self.packet_size = len(packet) + + # Extract IP/UDP info if available + if packet.haslayer(IP) and packet.haslayer(UDP): + ip_layer = packet[IP] + udp_layer = packet[UDP] + + self.src_ip = ip_layer.src + self.dst_ip = ip_layer.dst + self.src_port = udp_layer.sport + self.dst_port = udp_layer.dport + self.payload = bytes(udp_layer.payload) + else: + self.src_ip = "" + self.dst_ip = "" + self.src_port = 0 + self.dst_port = 0 + self.payload = bytes() + + # Parse Chapter 10 header + self.ch10_header = self._parse_ch10_header() + + def _parse_ch10_header(self) -> Optional[Dict]: + """Parse Chapter 10 header from payload""" + if len(self.payload) < 28: # Minimum payload size (4-byte prefix + 24-byte Ch10 header) + return None + + try: + # Look for Ch10 sync pattern in first several bytes + ch10_offset = None + for offset in range(min(8, len(self.payload) - 24)): + sync_pattern = struct.unpack('= 28: + ch10_packet = Chapter10Packet(packet, original_frame_num) + if ch10_packet.ch10_header: + self.ch10_packets.append(ch10_packet) + + # Check for TMATS packets (can be smaller, includes continuation frames) + if len(payload) >= 12: + tmats_packet = TMATSPacket(packet, original_frame_num) + if tmats_packet.is_tmats: + self.tmats_assembler.add_frame(tmats_packet) + + def display_ch10_summary(self): + """Display Chapter 10 summary statistics""" + if not self.ch10_packets: + print("No Chapter 10 packets found") + return + + print("\n" + "="*80) + print("CHAPTER 10 SUMMARY") + print("="*80) + + # Basic counts + total_packets = len(self.ch10_packets) + print(f"Total Chapter 10 packets: {total_packets}") + + # Time span + if total_packets > 0: + start_time = min(pkt.timestamp for pkt in self.ch10_packets) + end_time = max(pkt.timestamp for pkt in self.ch10_packets) + duration = end_time - start_time + print(f"Time span: {duration:.3f} seconds") + print(f"Start time: {datetime.fromtimestamp(start_time).strftime('%H:%M:%S.%f')[:-3]}") + print(f"End time: {datetime.fromtimestamp(end_time).strftime('%H:%M:%S.%f')[:-3]}") + + # Channel distribution + channels = {} + data_types = {} + for pkt in self.ch10_packets: + if pkt.ch10_header is not None: + ch_id = pkt.ch10_header['channel_id'] + data_type = pkt.ch10_header['data_type'] + channels[ch_id] = channels.get(ch_id, 0) + 1 + data_types[data_type] = data_types.get(data_type, 0) + 1 + + print(f"\nChannel distribution:") + for ch_id in sorted(channels.keys()): + count = channels[ch_id] + percentage = (count / total_packets) * 100 + print(f" Channel {ch_id}: {count} packets ({percentage:.1f}%)") + + print(f"\nData type distribution:") + for data_type in sorted(data_types.keys()): + count = data_types[data_type] + percentage = (count / total_packets) * 100 + print(f" Type {data_type}: {count} packets ({percentage:.1f}%)") + + # Size statistics + sizes = [pkt.packet_size for pkt in self.ch10_packets] + data_lengths = [pkt.ch10_header['data_length'] for pkt in self.ch10_packets if pkt.ch10_header is not None] + + print(f"\nPacket size statistics:") + print(f" Average: {statistics.mean(sizes):.1f} bytes") + print(f" Min: {min(sizes)} bytes") + print(f" Max: {max(sizes)} bytes") + print(f" Total data: {sum(data_lengths):,} bytes") + + # Rate calculations + if duration > 0: + packet_rate = total_packets / duration + data_rate = sum(data_lengths) / duration + print(f"\nRate statistics:") + print(f" Packet rate: {packet_rate:.1f} packets/sec") + print(f" Data rate: {data_rate/1024:.1f} KB/sec") + + def display_ch10_table(self): + """Display Chapter 10 packets in table format""" + if not self.ch10_packets: + print("No Chapter 10 packets found") + return + + print("\n" + "="*120) + print("CHAPTER 10 PACKET ANALYSIS") + print("="*120) + + # Create DataFrame for better table display + data = [] + for i, pkt in enumerate(self.ch10_packets): + if pkt.ch10_header is not None: + header = pkt.ch10_header + data.append({ + 'Packet#': i+1, + 'Timestamp': datetime.fromtimestamp(pkt.timestamp).strftime('%H:%M:%S.%f')[:-3], + 'Src IP': pkt.src_ip, + 'Dst IP': pkt.dst_ip, + 'Src Port': pkt.src_port, + 'Dst Port': pkt.dst_port, + 'Channel ID': header['channel_id'], + 'Seq Num': header['sequence_number'], + 'Data Type': header['data_type'], + 'Pkt Length': header['packet_length'], + 'Data Length': header['data_length'], + 'Flags': header['packet_flags'], + 'Size': pkt.packet_size + }) + + df = pd.DataFrame(data) + print(df.to_string(index=False)) + + def display_ptp_summary(self): + """Display PTP summary statistics""" + if not self.ptp_packets: + print("No PTP packets found") + return + + print("\n" + "="*80) + print("IEEE1588 PTP SUMMARY") + print("="*80) + + # Basic counts + total_packets = len(self.ptp_packets) + print(f"Total PTP packets: {total_packets}") + + # Time span + if total_packets > 0: + start_time = min(pkt.timestamp for pkt in self.ptp_packets) + end_time = max(pkt.timestamp for pkt in self.ptp_packets) + duration = end_time - start_time + print(f"Time span: {duration:.3f} seconds") + print(f"Start time: {datetime.fromtimestamp(start_time).strftime('%H:%M:%S.%f')[:-3]}") + print(f"End time: {datetime.fromtimestamp(end_time).strftime('%H:%M:%S.%f')[:-3]}") + + # Message type distribution + msg_types = {} + domains = {} + sources = {} + + for pkt in self.ptp_packets: + if pkt.ptp_header is not None: + msg_type = pkt.ptp_header['message_type'] + domain = pkt.ptp_header['domain_number'] + source = pkt.src_ip + + msg_types[msg_type] = msg_types.get(msg_type, 0) + 1 + domains[domain] = domains.get(domain, 0) + 1 + sources[source] = sources.get(source, 0) + 1 + + print(f"\nMessage type distribution:") + for msg_type in sorted(msg_types.keys()): + count = msg_types[msg_type] + percentage = (count / total_packets) * 100 + print(f" {msg_type}: {count} packets ({percentage:.1f}%)") + + print(f"\nDomain distribution:") + for domain in sorted(domains.keys()): + count = domains[domain] + percentage = (count / total_packets) * 100 + print(f" Domain {domain}: {count} packets ({percentage:.1f}%)") + + print(f"\nSource IP distribution:") + for source in sorted(sources.keys()): + count = sources[source] + percentage = (count / total_packets) * 100 + print(f" {source}: {count} packets ({percentage:.1f}%)") + + # Rate calculations + if duration > 0: + packet_rate = total_packets / duration + print(f"\nRate statistics:") + print(f" Overall PTP rate: {packet_rate:.1f} packets/sec") + + # Sync message rate + sync_count = msg_types.get('Sync', 0) + if sync_count > 0: + sync_rate = sync_count / duration + print(f" Sync message rate: {sync_rate:.1f} packets/sec") + + def display_ptp_table(self): + """Display PTP packets in table format""" + if not self.ptp_packets: + print("No PTP packets found") + return + + print("\n" + "="*100) + print("IEEE1588 PTP PACKET ANALYSIS") + print("="*100) + + data = [] + for i, pkt in enumerate(self.ptp_packets): + if pkt.ptp_header is not None: + header = pkt.ptp_header + data.append({ + 'Packet#': i+1, + 'Timestamp': datetime.fromtimestamp(pkt.timestamp).strftime('%H:%M:%S.%f')[:-3], + 'Src IP': pkt.src_ip, + 'Dst IP': pkt.dst_ip, + 'Message Type': header['message_type'], + 'Domain': header['domain_number'], + 'Sequence ID': header['sequence_id'], + 'Flags': header['flags'], + 'Correction': header['correction_field'], + 'Interval': header['log_message_interval'] + }) + + df = pd.DataFrame(data) + print(df.to_string(index=False)) + + def statistical_analysis(self): + """Perform statistical analysis for intermittent issue detection""" + print("\n" + "="*80) + print("STATISTICAL ANALYSIS") + print("="*80) + + if self.ch10_packets: + self._analyze_ch10_statistics() + + if self.ptp_packets: + self._analyze_ptp_statistics() + + def _analyze_ch10_statistics(self): + """Analyze Chapter 10 packet statistics""" + print("\nChapter 10 Statistics:") + print("-" * 40) + + # Timing analysis + timestamps = [pkt.timestamp for pkt in self.ch10_packets] + if len(timestamps) > 1: + intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] + + print(f"Packet count: {len(self.ch10_packets)}") + print(f"Time span: {timestamps[-1] - timestamps[0]:.3f} seconds") + print(f"Average interval: {statistics.mean(intervals)*1000:.3f} ms") + print(f"Min interval: {min(intervals)*1000:.3f} ms") + print(f"Max interval: {max(intervals)*1000:.3f} ms") + print(f"Std deviation: {statistics.stdev(intervals)*1000:.3f} ms") + + # Detect potential issues + mean_interval = statistics.mean(intervals) + std_interval = statistics.stdev(intervals) + outliers = [] + outlier_frames = [] + + for i, interval in enumerate(intervals): + if abs(interval - mean_interval) > 3 * std_interval: + outliers.append(interval * 1000) # Convert to ms + # Get the original frame number of the second packet in the interval + original_frame = self.ch10_packets[i + 1].original_frame_num + outlier_frames.append(original_frame) + + if outliers: + print(f"WARNING: {len(outliers)} timing outliers detected!") + print(f"Outlier details:") + for i, (frame_num, interval_ms) in enumerate(zip(outlier_frames, outliers)): + if i < 10: # Show first 10 outliers + print(f" Frame {frame_num}: {interval_ms:.3f} ms interval") + elif i == 10: + print(f" ... and {len(outliers) - 10} more outliers") + + # Channel ID analysis + channel_ids = [pkt.ch10_header['channel_id'] for pkt in self.ch10_packets if pkt.ch10_header is not None] + unique_channels = set(channel_ids) + print(f"Unique channels: {sorted(unique_channels)}") + + for ch_id in unique_channels: + count = channel_ids.count(ch_id) + print(f" Channel {ch_id}: {count} packets") + + # Sequence number analysis + seq_numbers = [pkt.ch10_header['sequence_number'] for pkt in self.ch10_packets if pkt.ch10_header is not None] + if len(set(seq_numbers)) < len(seq_numbers): + duplicates = len(seq_numbers) - len(set(seq_numbers)) + print(f"WARNING: {duplicates} duplicate sequence numbers detected!") + + # Check for sequence gaps + seq_gaps = [] + valid_packets = [pkt for pkt in self.ch10_packets if pkt.ch10_header is not None] + for i in range(1, len(seq_numbers)): + expected = (seq_numbers[i-1] + 1) % 256 + if seq_numbers[i] != expected: + original_frame = valid_packets[i].original_frame_num + seq_gaps.append((original_frame, seq_numbers[i-1], seq_numbers[i])) + + if seq_gaps: + print(f"WARNING: {len(seq_gaps)} sequence number gaps detected!") + print(f"Sequence gap details:") + for i, (frame_num, prev, curr) in enumerate(seq_gaps): + if i < 10: # Show first 10 gaps + print(f" Frame {frame_num}: expected {(prev + 1) % 256}, got {curr}") + elif i == 10: + print(f" ... and {len(seq_gaps) - 10} more gaps") + + def _analyze_ptp_statistics(self): + """Analyze PTP packet statistics""" + print("\nPTP Statistics:") + print("-" * 40) + + # Message type distribution + msg_types = [pkt.ptp_header['message_type'] for pkt in self.ptp_packets if pkt.ptp_header is not None] + unique_types = set(msg_types) + + print(f"Total PTP packets: {len(self.ptp_packets)}") + print("Message type distribution:") + for msg_type in unique_types: + count = msg_types.count(msg_type) + print(f" {msg_type}: {count} packets") + + # Timing analysis for Sync messages + sync_packets = [pkt for pkt in self.ptp_packets if pkt.ptp_header is not None and pkt.ptp_header['message_type'] == 'Sync'] + if len(sync_packets) > 1: + sync_times = [pkt.timestamp for pkt in sync_packets] + sync_intervals = [sync_times[i+1] - sync_times[i] for i in range(len(sync_times)-1)] + + print(f"\nSync message analysis:") + print(f" Count: {len(sync_packets)}") + print(f" Average interval: {statistics.mean(sync_intervals)*1000:.3f} ms") + print(f" Min interval: {min(sync_intervals)*1000:.3f} ms") + print(f" Max interval: {max(sync_intervals)*1000:.3f} ms") + print(f" Std deviation: {statistics.stdev(sync_intervals)*1000:.3f} ms") + + def display_tmats_content(self): + """Display assembled TMATS content""" + if self.tmats_assembler.get_frame_count() == 0: + print("No TMATS frames found") + return + + print("\n" + "="*80) + print("TMATS (TELEMETRY ATTRIBUTES TRANSFER STANDARD) CONTENT") + print("="*80) + + print(f"TMATS frames found: {self.tmats_assembler.get_frame_count()}") + + # Assemble the TMATS content + assembled_content = self.tmats_assembler.assemble() + + if assembled_content: + print(f"TMATS files found: {self.tmats_assembler.get_file_count()}") + print(f"Total TMATS length: {len(assembled_content)} characters") + print("\nTMATS Content:") + print("-" * 80) + + # The assembled content is already cleaned by the assembler + lines = assembled_content.split('\n') + for line in lines: + if line.strip(): # Only print non-empty lines + print(line) + + print("-" * 80) + + # Show some statistics + attribute_lines = [line for line in lines if '\\' in line and ':' in line] + comment_lines = [line for line in lines if line.strip().startswith('COMMENT:')] + + print(f"Total lines: {len([l for l in lines if l.strip()])}") + print(f"Attribute lines: {len(attribute_lines)}") + print(f"Comment lines: {len(comment_lines)}") + else: + print("No TMATS content could be assembled") + + +def main(): + parser = argparse.ArgumentParser(description='Analyze PCAP files with Chapter 10 and PTP data') + parser.add_argument('pcap_file', help='Path to PCAP file') + parser.add_argument('--ch10-only', action='store_true', help='Show only Chapter 10 analysis') + parser.add_argument('--ptp-only', action='store_true', help='Show only PTP analysis') + parser.add_argument('--stats-only', action='store_true', help='Show only statistical analysis') + parser.add_argument('--summary-only', action='store_true', help='Show only summary information') + parser.add_argument('--no-tables', action='store_true', help='Skip detailed packet tables') + parser.add_argument('--tmats', action='store_true', help='Display TMATS (Telemetry Attributes Transfer Standard) content') + parser.add_argument('--tmats-only', action='store_true', help='Show only TMATS content') + + args = parser.parse_args() + + analyzer = PcapAnalyzer(args.pcap_file) + analyzer.analyze() + + # Handle TMATS-only mode + if args.tmats_only: + analyzer.display_tmats_content() + return + + # Show summaries first + if not args.stats_only: + if not args.ptp_only: + analyzer.display_ch10_summary() + if not args.ch10_only: + analyzer.display_ptp_summary() + + # Show detailed tables unless suppressed + if not args.stats_only and not args.summary_only and not args.no_tables: + if not args.ptp_only: + analyzer.display_ch10_table() + if not args.ch10_only: + analyzer.display_ptp_table() + + # Show TMATS content if requested + if args.tmats: + analyzer.display_tmats_content() + + # Show statistical analysis + if not args.summary_only: + analyzer.statistical_analysis() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ptp_packet.py b/ptp_packet.py new file mode 100644 index 0000000..db8afde --- /dev/null +++ b/ptp_packet.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +PTP Packet class for IEEE1588 PTP frame parsing +""" + +import struct +from typing import Dict, Optional + +try: + from scapy.layers.inet import IP, UDP +except ImportError: + print("Error: scapy library not found. Install with: pip install scapy") + exit(1) + + +class PTPPacket: + """Represents an IEEE1588 PTP packet""" + + def __init__(self, packet): + """ + Initialize PTP packet from raw scapy packet + + Args: + packet: Raw scapy packet + """ + self.raw_packet = packet + + # Extract basic packet info + self.timestamp = float(packet.time) + + # Extract IP/UDP info if available + if packet.haslayer(IP) and packet.haslayer(UDP): + ip_layer = packet[IP] + udp_layer = packet[UDP] + + self.src_ip = ip_layer.src + self.dst_ip = ip_layer.dst + self.src_port = udp_layer.sport + self.dst_port = udp_layer.dport + self.payload = bytes(udp_layer.payload) + else: + self.src_ip = "" + self.dst_ip = "" + self.src_port = 0 + self.dst_port = 0 + self.payload = bytes() + + # Parse PTP header + self.ptp_header = self._parse_ptp_header() + + def _parse_ptp_header(self) -> Optional[Dict]: + """Parse PTP header from payload""" + if len(self.payload) < 34: # Minimum PTP header size + return None + + try: + message_type = self.payload[0] & 0x0F + version = (self.payload[1] >> 4) & 0x0F + message_length = struct.unpack('>H', self.payload[2:4])[0] + domain_number = self.payload[4] + flags = struct.unpack('>H', self.payload[6:8])[0] + correction_field = struct.unpack('>Q', self.payload[8:16])[0] + source_port_id = self.payload[20:30].hex() + sequence_id = struct.unpack('>H', self.payload[30:32])[0] + control_field = self.payload[32] + log_message_interval = struct.unpack('b', self.payload[33:34])[0] + + message_types = { + 0x0: 'Sync', 0x1: 'Delay_Req', 0x2: 'Pdelay_Req', 0x3: 'Pdelay_Resp', + 0x8: 'Follow_Up', 0x9: 'Delay_Resp', 0xA: 'Pdelay_Resp_Follow_Up', + 0xB: 'Announce', 0xC: 'Signaling', 0xD: 'Management' + } + + return { + 'message_type': message_types.get(message_type, f'Unknown({message_type})'), + 'version': version, + 'message_length': message_length, + 'domain_number': domain_number, + 'flags': f'0x{flags:04X}', + 'correction_field': correction_field, + 'source_port_id': source_port_id, + 'sequence_id': sequence_id, + 'control_field': control_field, + 'log_message_interval': log_message_interval + } + except (struct.error, IndexError): + return None \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..089ab45 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +scapy>=2.4.5 +pandas>=1.5.0 +numpy>=1.21.0 \ No newline at end of file diff --git a/tmats_packet.py b/tmats_packet.py new file mode 100644 index 0000000..ab22193 --- /dev/null +++ b/tmats_packet.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +""" +TMATS Packet class for IRIG106 Chapter 10 TMATS frame parsing +""" + +import struct +from typing import Dict, Optional, List + +try: + from scapy.layers.inet import IP, UDP +except ImportError: + print("Error: scapy library not found. Install with: pip install scapy") + exit(1) + + +class TMATSPacket: + """Represents an IRIG106 Chapter 10 TMATS packet""" + + def __init__(self, packet, original_frame_num: Optional[int] = None): + """ + Initialize TMATS packet from raw scapy packet + + Args: + packet: Raw scapy packet + original_frame_num: Original frame number in PCAP file + """ + self.raw_packet = packet + self.original_frame_num: Optional[int] = original_frame_num + + # Extract basic packet info + self.timestamp = float(packet.time) + self.packet_size = len(packet) + + # Extract IP/UDP info if available + if packet.haslayer(IP) and packet.haslayer(UDP): + ip_layer = packet[IP] + udp_layer = packet[UDP] + + self.src_ip = ip_layer.src + self.dst_ip = ip_layer.dst + self.src_port = udp_layer.sport + self.dst_port = udp_layer.dport + self.payload = bytes(udp_layer.payload) + else: + self.src_ip = "" + self.dst_ip = "" + self.src_port = 0 + self.dst_port = 0 + self.payload = bytes() + + # Parse TMATS content + self.tmats_info = self._parse_tmats_content() + self.is_tmats = self.tmats_info is not None + + def _parse_tmats_content(self) -> Optional[Dict]: + """Parse TMATS content from payload""" + if len(self.payload) < 12: + return None + + try: + # Look for Chapter 10 sync pattern + ch10_offset = None + for offset in range(min(16, len(self.payload) - 24)): + if len(self.payload) >= offset + 2: + sync_pattern = struct.unpack('= len(self.payload): + return None + + # Extract TMATS ASCII data + tmats_data = self.payload[data_start:] + + # Try to decode as ASCII + try: + ascii_content = tmats_data.decode('ascii', errors='ignore') + + # Check if this looks like TMATS data + tmats_patterns = ['\\', 'R-1\\', 'G\\', 'T-', 'P-', 'COMMENT:', 'DSI', 'DST'] + if any(pattern in ascii_content for pattern in tmats_patterns): + return { + 'raw_data': tmats_data, + 'ascii_content': ascii_content, + 'data_start_offset': data_start, + 'data_length': len(tmats_data), + 'has_ch10_header': ch10_offset is not None, + 'is_continuation': ch10_offset is None + } + except Exception: + pass + + return None + + except (struct.error, IndexError): + return None + + def _parse_ch10_header(self, offset: int) -> None: + """Parse Chapter 10 header if present""" + try: + base = offset + sync_pattern = struct.unpack(' Optional[int]: + """Find where ASCII TMATS data starts in continuation frames""" + # Look for start of ASCII data by scanning for printable characters + for i in range(min(32, len(self.payload))): + # Check if we have a sequence of printable ASCII characters + if i + 10 < len(self.payload): + sample = self.payload[i:i+10] + try: + decoded = sample.decode('ascii') + # If we can decode it and it contains TMATS-like characters + if any(c in decoded for c in ['R', 'G', 'T', 'P', '\\', '-', ':']): + return i + except: + continue + + # Fallback - assume data starts after a simple header (based on observation) + return 12 if len(self.payload) > 12 else None + + def get_ascii_content(self) -> str: + """Get the ASCII content of the TMATS data""" + if self.tmats_info: + return self.tmats_info['ascii_content'] + return "" + + def is_continuation_frame(self) -> bool: + """Check if this is a TMATS continuation frame (without Ch10 header)""" + if self.tmats_info: + return self.tmats_info['is_continuation'] + return False + + def has_chapter10_header(self) -> bool: + """Check if this frame has a full Chapter 10 header""" + if self.tmats_info: + return self.tmats_info['has_ch10_header'] + return False + + +class TMATSAssembler: + """Assembles TMATS data from multiple frames""" + + def __init__(self): + self.tmats_frames: List[TMATSPacket] = [] + self.assembled_content = "" + self.tmats_files: List[str] = [] + + def add_frame(self, tmats_packet: TMATSPacket) -> None: + """Add a TMATS frame to the assembler""" + if tmats_packet.is_tmats: + self.tmats_frames.append(tmats_packet) + + def assemble(self) -> str: + """Assemble TMATS frames into complete TMATS files, stopping at END markers""" + if not self.tmats_frames: + return "" + + # Sort frames by timestamp to ensure correct order + sorted_frames = sorted(self.tmats_frames, key=lambda x: x.timestamp) + + # Assemble TMATS content, detecting file boundaries + current_tmats = [] + self.tmats_files = [] + + for frame in sorted_frames: + content = frame.get_ascii_content() + if not content: + continue + + current_tmats.append(content) + + # Check if this frame contains a TMATS END marker + if self._contains_tmats_end(content): + # Complete TMATS file found + complete_tmats = ''.join(current_tmats) + self.tmats_files.append(complete_tmats) + current_tmats = [] # Start new TMATS file + + # Handle any remaining partial TMATS content + if current_tmats: + partial_tmats = ''.join(current_tmats) + self.tmats_files.append(partial_tmats) + + # Return the first complete TMATS file, or all if multiple unique files + if self.tmats_files: + # Check if we have multiple unique TMATS files + unique_files = self._get_unique_tmats_files() + if len(unique_files) == 1: + self.assembled_content = unique_files[0] + else: + # Multiple unique TMATS files - show all with separators + self.assembled_content = self._format_multiple_tmats_files(unique_files) + else: + self.assembled_content = "" + + return self.assembled_content + + def _contains_tmats_end(self, content: str) -> bool: + """Check if content contains a TMATS END marker""" + end_patterns = [ + 'TMATS END', + 'TMATS_END', + '-----END', + 'END----' + ] + return any(pattern in content for pattern in end_patterns) + + def _get_unique_tmats_files(self) -> List[str]: + """Get unique TMATS files, removing duplicates""" + unique_files = [] + + for tmats_file in self.tmats_files: + # Clean the content for comparison + cleaned = self._clean_tmats_content(tmats_file) + + # Check if this is a duplicate of an existing file + is_duplicate = False + for existing_file in unique_files: + existing_cleaned = self._clean_tmats_content(existing_file) + if self._are_tmats_equivalent(cleaned, existing_cleaned): + is_duplicate = True + break + + if not is_duplicate and cleaned.strip(): + unique_files.append(tmats_file) + + return unique_files + + def _clean_tmats_content(self, content: str) -> str: + """Clean TMATS content for comparison by removing junk characters""" + # Remove non-printable characters except newlines + cleaned = ''.join(c if c.isprintable() or c == '\n' else '' for c in content) + + # Remove leading junk characters that might vary between transmissions + lines = cleaned.split('\n') + clean_lines = [] + + for line in lines: + # Skip lines that are mostly junk characters + if len(line.strip()) < 3: + continue + + # Look for lines that start with valid TMATS patterns + stripped = line.strip() + if any(stripped.startswith(pattern) for pattern in ['G\\', 'R-', 'V-', 'T-', 'P-', 'COMMENT:']): + clean_lines.append(stripped) + elif any(pattern in stripped for pattern in ['TMATS END', '----']): + clean_lines.append(stripped) + + return '\n'.join(clean_lines) + + def _are_tmats_equivalent(self, content1: str, content2: str) -> bool: + """Check if two TMATS contents are equivalent (accounting for minor differences)""" + # Simple comparison - if they're more than 80% similar, consider them equivalent + lines1 = set(line.strip() for line in content1.split('\n') if line.strip()) + lines2 = set(line.strip() for line in content2.split('\n') if line.strip()) + + if not lines1 or not lines2: + return False + + # Calculate similarity + intersection = lines1.intersection(lines2) + union = lines1.union(lines2) + + similarity = len(intersection) / len(union) if union else 0 + return similarity > 0.8 + + def _format_multiple_tmats_files(self, tmats_files: List[str]) -> str: + """Format multiple TMATS files with separators""" + if not tmats_files: + return "" + + if len(tmats_files) == 1: + return self._clean_tmats_content(tmats_files[0]) + + # Multiple unique files - show with separators + formatted_parts = [] + for i, tmats_file in enumerate(tmats_files): + if i > 0: + formatted_parts.append(f"\n{'='*60}\nTMATS FILE #{i+1}\n{'='*60}\n") + formatted_parts.append(self._clean_tmats_content(tmats_file)) + + return ''.join(formatted_parts) + + def get_frame_count(self) -> int: + """Get the number of TMATS frames""" + return len(self.tmats_frames) + + def get_file_count(self) -> int: + """Get the number of unique TMATS files found""" + return len(self.tmats_files) + + def get_total_length(self) -> int: + """Get the total length of assembled TMATS data""" + return len(self.assembled_content) \ No newline at end of file