Files

143 lines
5.1 KiB
Python

"""
PTP (IEEE 1588-2019) Precision Time Protocol dissector
"""
import struct
from typing import Dict, Optional, Any
try:
from scapy.all import Packet, UDP, Raw
except ImportError:
print("Error: scapy library required. Install with: pip install scapy")
import sys
sys.exit(1)
from .base import ProtocolDissector, DissectionResult, ProtocolType
class PTPDissector(ProtocolDissector):
"""IEEE 1588-2019 Precision Time Protocol dissector"""
PTP_MESSAGE_TYPES = {
0x0: "Sync",
0x1: "Delay_Req",
0x2: "Pdelay_Req",
0x3: "Pdelay_Resp",
0x8: "Follow_Up",
0x9: "Delay_Resp",
0xA: "Pdelay_Resp_Follow_Up",
0xB: "Announce",
0xC: "Signaling",
0xD: "Management"
}
def __init__(self):
self.ptp_ports = {319, 320} # PTP event and general ports
def can_dissect(self, packet: Packet) -> bool:
"""Check if packet is PTP"""
if not packet.haslayer(UDP):
return False
udp_layer = packet[UDP]
if udp_layer.dport not in self.ptp_ports and udp_layer.sport not in self.ptp_ports:
return False
if not packet.haslayer(Raw):
return False
raw_data = bytes(packet[Raw])
return len(raw_data) >= 34 # Minimum PTP header size
def get_protocol_type(self) -> ProtocolType:
return ProtocolType.PTP
def dissect(self, packet: Packet) -> Optional[DissectionResult]:
"""Dissect PTP packet"""
if not self.can_dissect(packet):
return None
raw_data = bytes(packet[Raw])
try:
header = self._parse_ptp_header(raw_data[:34])
result = DissectionResult(
protocol=ProtocolType.PTP,
fields=header
)
# Parse message-specific fields
msg_type = header.get('message_type', 0)
if len(raw_data) > 34:
msg_fields = self._parse_message_fields(msg_type, raw_data[34:])
if msg_fields:
result.fields.update(msg_fields)
return result
except Exception as e:
return DissectionResult(
protocol=ProtocolType.PTP,
fields={},
errors=[f"PTP parsing error: {str(e)}"]
)
def _parse_ptp_header(self, header_data: bytes) -> Dict[str, Any]:
"""Parse PTP common header"""
if len(header_data) < 34:
raise ValueError("PTP header too short")
# Parse first 4 bytes
first_word = struct.unpack('>I', header_data[:4])[0]
message_type = first_word & 0xF
transport_specific = (first_word >> 4) & 0xF
ptp_version = (first_word >> 8) & 0xFF
domain_number = (first_word >> 24) & 0xFF
# Parse remaining header fields
message_length = struct.unpack('>H', header_data[4:6])[0]
flags = struct.unpack('>H', header_data[6:8])[0]
correction = struct.unpack('>Q', header_data[8:16])[0]
source_port_id = header_data[20:28]
sequence_id = struct.unpack('>H', header_data[30:32])[0]
control = header_data[32]
log_mean_message_interval = struct.unpack('b', header_data[33:34])[0]
return {
'message_type': message_type,
'message_type_name': self.PTP_MESSAGE_TYPES.get(message_type, f"Unknown (0x{message_type:x})"),
'transport_specific': transport_specific,
'ptp_version': ptp_version,
'domain_number': domain_number,
'message_length': message_length,
'flags': flags,
'correction_field': correction,
'source_port_identity': source_port_id.hex(),
'sequence_id': sequence_id,
'control_field': control,
'log_mean_message_interval': log_mean_message_interval
}
def _parse_message_fields(self, msg_type: int, payload: bytes) -> Optional[Dict[str, Any]]:
"""Parse message-specific fields"""
if msg_type in [0x0, 0x1, 0x2, 0x3]: # Sync, Delay_Req, Pdelay_Req, Pdelay_Resp
if len(payload) >= 10:
timestamp = struct.unpack('>HI', payload[:6]) # seconds_msb, seconds_lsb, nanoseconds
nanoseconds = struct.unpack('>I', payload[6:10])[0]
return {
'origin_timestamp_sec': (timestamp[0] << 32) | timestamp[1],
'origin_timestamp_nsec': nanoseconds
}
elif msg_type == 0xB: # Announce
if len(payload) >= 20:
return {
'current_utc_offset': struct.unpack('>h', payload[10:12])[0],
'grandmaster_priority1': payload[13],
'grandmaster_clock_quality': payload[14:18].hex(),
'grandmaster_priority2': payload[18],
'grandmaster_identity': payload[19:27].hex()
}
return None