143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
"""
|
|
PTP (IEEE 1588-2019) Precision Time Protocol dissector
|
|
"""
|
|
|
|
import struct
|
|
from typing import Dict, Optional, Any
|
|
|
|
try:
|
|
from scapy.all import Packet, UDP, Raw
|
|
except ImportError:
|
|
print("Error: scapy library required. Install with: pip install scapy")
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
from .base import ProtocolDissector, DissectionResult, ProtocolType
|
|
|
|
|
|
class PTPDissector(ProtocolDissector):
|
|
"""IEEE 1588-2019 Precision Time Protocol dissector"""
|
|
|
|
PTP_MESSAGE_TYPES = {
|
|
0x0: "Sync",
|
|
0x1: "Delay_Req",
|
|
0x2: "Pdelay_Req",
|
|
0x3: "Pdelay_Resp",
|
|
0x8: "Follow_Up",
|
|
0x9: "Delay_Resp",
|
|
0xA: "Pdelay_Resp_Follow_Up",
|
|
0xB: "Announce",
|
|
0xC: "Signaling",
|
|
0xD: "Management"
|
|
}
|
|
|
|
def __init__(self):
|
|
self.ptp_ports = {319, 320} # PTP event and general ports
|
|
|
|
def can_dissect(self, packet: Packet) -> bool:
|
|
"""Check if packet is PTP"""
|
|
if not packet.haslayer(UDP):
|
|
return False
|
|
|
|
udp_layer = packet[UDP]
|
|
if udp_layer.dport not in self.ptp_ports and udp_layer.sport not in self.ptp_ports:
|
|
return False
|
|
|
|
if not packet.haslayer(Raw):
|
|
return False
|
|
|
|
raw_data = bytes(packet[Raw])
|
|
return len(raw_data) >= 34 # Minimum PTP header size
|
|
|
|
def get_protocol_type(self) -> ProtocolType:
|
|
return ProtocolType.PTP
|
|
|
|
def dissect(self, packet: Packet) -> Optional[DissectionResult]:
|
|
"""Dissect PTP packet"""
|
|
if not self.can_dissect(packet):
|
|
return None
|
|
|
|
raw_data = bytes(packet[Raw])
|
|
|
|
try:
|
|
header = self._parse_ptp_header(raw_data[:34])
|
|
|
|
result = DissectionResult(
|
|
protocol=ProtocolType.PTP,
|
|
fields=header
|
|
)
|
|
|
|
# Parse message-specific fields
|
|
msg_type = header.get('message_type', 0)
|
|
if len(raw_data) > 34:
|
|
msg_fields = self._parse_message_fields(msg_type, raw_data[34:])
|
|
if msg_fields:
|
|
result.fields.update(msg_fields)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return DissectionResult(
|
|
protocol=ProtocolType.PTP,
|
|
fields={},
|
|
errors=[f"PTP parsing error: {str(e)}"]
|
|
)
|
|
|
|
def _parse_ptp_header(self, header_data: bytes) -> Dict[str, Any]:
|
|
"""Parse PTP common header"""
|
|
if len(header_data) < 34:
|
|
raise ValueError("PTP header too short")
|
|
|
|
# Parse first 4 bytes
|
|
first_word = struct.unpack('>I', header_data[:4])[0]
|
|
|
|
message_type = first_word & 0xF
|
|
transport_specific = (first_word >> 4) & 0xF
|
|
ptp_version = (first_word >> 8) & 0xFF
|
|
domain_number = (first_word >> 24) & 0xFF
|
|
|
|
# Parse remaining header fields
|
|
message_length = struct.unpack('>H', header_data[4:6])[0]
|
|
flags = struct.unpack('>H', header_data[6:8])[0]
|
|
correction = struct.unpack('>Q', header_data[8:16])[0]
|
|
source_port_id = header_data[20:28]
|
|
sequence_id = struct.unpack('>H', header_data[30:32])[0]
|
|
control = header_data[32]
|
|
log_mean_message_interval = struct.unpack('b', header_data[33:34])[0]
|
|
|
|
return {
|
|
'message_type': message_type,
|
|
'message_type_name': self.PTP_MESSAGE_TYPES.get(message_type, f"Unknown (0x{message_type:x})"),
|
|
'transport_specific': transport_specific,
|
|
'ptp_version': ptp_version,
|
|
'domain_number': domain_number,
|
|
'message_length': message_length,
|
|
'flags': flags,
|
|
'correction_field': correction,
|
|
'source_port_identity': source_port_id.hex(),
|
|
'sequence_id': sequence_id,
|
|
'control_field': control,
|
|
'log_mean_message_interval': log_mean_message_interval
|
|
}
|
|
|
|
def _parse_message_fields(self, msg_type: int, payload: bytes) -> Optional[Dict[str, Any]]:
|
|
"""Parse message-specific fields"""
|
|
if msg_type in [0x0, 0x1, 0x2, 0x3]: # Sync, Delay_Req, Pdelay_Req, Pdelay_Resp
|
|
if len(payload) >= 10:
|
|
timestamp = struct.unpack('>HI', payload[:6]) # seconds_msb, seconds_lsb, nanoseconds
|
|
nanoseconds = struct.unpack('>I', payload[6:10])[0]
|
|
return {
|
|
'origin_timestamp_sec': (timestamp[0] << 32) | timestamp[1],
|
|
'origin_timestamp_nsec': nanoseconds
|
|
}
|
|
elif msg_type == 0xB: # Announce
|
|
if len(payload) >= 20:
|
|
return {
|
|
'current_utc_offset': struct.unpack('>h', payload[10:12])[0],
|
|
'grandmaster_priority1': payload[13],
|
|
'grandmaster_clock_quality': payload[14:18].hex(),
|
|
'grandmaster_priority2': payload[18],
|
|
'grandmaster_identity': payload[19:27].hex()
|
|
}
|
|
|
|
return None |