284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""
|
|
IENA (Improved Ethernet Network Architecture) dissector for Airbus protocols
|
|
"""
|
|
|
|
import struct
|
|
import time
|
|
from typing import Dict, Optional, Any
|
|
|
|
try:
|
|
from scapy.all import Packet, UDP, Raw
|
|
except ImportError:
|
|
print("Error: scapy library required. Install with: pip install scapy")
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
from .base import ProtocolDissector, DissectionResult, ProtocolType
|
|
|
|
|
|
class IENADissector(ProtocolDissector):
|
|
"""Airbus IENA (Improved Ethernet Network Architecture) dissector"""
|
|
|
|
IENA_TYPES = {
|
|
0: "P-type",
|
|
1: "D-type (with delay)",
|
|
2: "N-type",
|
|
3: "M-type (with delay)",
|
|
4: "Q-type"
|
|
}
|
|
|
|
def __init__(self):
|
|
self.iena_ports = {50000, 50001}
|
|
self.lxrs_id = 0xF6AE
|
|
|
|
def can_dissect(self, packet: Packet) -> bool:
|
|
"""Check if packet is IENA"""
|
|
if not packet.haslayer(UDP):
|
|
return False
|
|
|
|
udp_layer = packet[UDP]
|
|
if udp_layer.dport not in self.iena_ports and udp_layer.sport not in self.iena_ports:
|
|
return False
|
|
|
|
if not packet.haslayer(Raw):
|
|
return False
|
|
|
|
raw_data = bytes(packet[Raw])
|
|
return len(raw_data) >= 14 # Minimum IENA header size
|
|
|
|
def get_protocol_type(self) -> ProtocolType:
|
|
return ProtocolType.IENA
|
|
|
|
def dissect(self, packet: Packet) -> Optional[DissectionResult]:
|
|
"""Dissect IENA packet"""
|
|
if not self.can_dissect(packet):
|
|
return None
|
|
|
|
raw_data = bytes(packet[Raw])
|
|
|
|
try:
|
|
header = self._parse_iena_header(raw_data[:14])
|
|
|
|
result = DissectionResult(
|
|
protocol=ProtocolType.IENA,
|
|
fields=header
|
|
)
|
|
|
|
# Parse payload based on packet type
|
|
packet_type = header.get('packet_type', 0)
|
|
iena_size = header.get('size_in_words', 0)
|
|
|
|
if iena_size > 8 and len(raw_data) >= iena_size * 2:
|
|
payload_data = raw_data[14:iena_size * 2 - 2] # Exclude trailer
|
|
payload_info = self._parse_payload(packet_type, payload_data, header)
|
|
if payload_info:
|
|
result.fields.update(payload_info)
|
|
|
|
result.payload = payload_data
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return DissectionResult(
|
|
protocol=ProtocolType.IENA,
|
|
fields={},
|
|
errors=[f"IENA parsing error: {str(e)}"]
|
|
)
|
|
|
|
def _parse_iena_header(self, header_data: bytes) -> Dict[str, Any]:
|
|
"""Parse IENA header (14 bytes)"""
|
|
if len(header_data) < 14:
|
|
raise ValueError("IENA header too short")
|
|
|
|
# Unpack header fields (big endian for most fields)
|
|
key_id = struct.unpack('>H', header_data[0:2])[0]
|
|
size_words = struct.unpack('>H', header_data[2:4])[0]
|
|
|
|
# Time field is 6 bytes
|
|
time_bytes = header_data[4:10]
|
|
time_value = int.from_bytes(time_bytes, 'big')
|
|
|
|
key_status = header_data[10]
|
|
n2_status = header_data[11]
|
|
sequence_num = struct.unpack('>H', header_data[12:14])[0]
|
|
|
|
# Parse key status bits
|
|
is_positional = bool(key_status & 0x80)
|
|
is_discard = bool(key_status & 0x40)
|
|
is_msg = bool(key_status & 0x20)
|
|
has_delay = bool(key_status & 0x10)
|
|
n4_restriction = bool(key_status & 0x08)
|
|
word_size = key_status & 0x07
|
|
|
|
# Determine packet type
|
|
packet_type = 0 # P-type default
|
|
if not is_positional and is_msg:
|
|
packet_type = 3 if has_delay else 4 # M-type or Q-type
|
|
elif not is_positional and not is_msg:
|
|
packet_type = 1 if has_delay else 2 # D-type or N-type
|
|
|
|
# Convert time to readable format
|
|
current_year = time.gmtime().tm_year
|
|
year_start = time.mktime((current_year, 1, 1, 0, 0, 0, 0, 0, 0))
|
|
time_sec = year_start + (time_value / 1000000.0) # IENA time is in microseconds
|
|
|
|
return {
|
|
'key_id': key_id,
|
|
'size_in_words': size_words,
|
|
'time_value': time_value,
|
|
'time_readable': time.strftime("%H:%M:%S %d %b %Y", time.gmtime(time_sec)),
|
|
'key_status': key_status,
|
|
'is_positional': is_positional,
|
|
'is_discard': is_discard,
|
|
'is_message': is_msg,
|
|
'has_delay': has_delay,
|
|
'n4_restriction': n4_restriction,
|
|
'word_size': word_size,
|
|
'n2_status': n2_status,
|
|
'sequence_number': sequence_num,
|
|
'packet_type': packet_type,
|
|
'packet_type_name': self.IENA_TYPES.get(packet_type, "Unknown")
|
|
}
|
|
|
|
def _parse_payload(self, packet_type: int, payload: bytes, header: Dict) -> Optional[Dict[str, Any]]:
|
|
"""Parse IENA payload based on packet type"""
|
|
try:
|
|
word_size = header.get('word_size', 0)
|
|
|
|
if packet_type == 2: # N-type
|
|
return self._parse_n_type(payload, word_size)
|
|
elif packet_type == 1: # D-type
|
|
return self._parse_d_type(payload, word_size)
|
|
elif packet_type in [3, 4]: # M-type or Q-type
|
|
return self._parse_mq_type(payload, packet_type)
|
|
else: # P-type
|
|
return {'payload_data': payload.hex()}
|
|
|
|
except Exception as e:
|
|
return {'parse_error': str(e)}
|
|
|
|
def _parse_n_type(self, payload: bytes, word_size: int) -> Dict[str, Any]:
|
|
"""Parse N-type message payload"""
|
|
if len(payload) < 2:
|
|
return {}
|
|
|
|
n_len_bytes = (word_size + 1) * 2
|
|
if n_len_bytes <= 0:
|
|
return {}
|
|
|
|
n_instances = len(payload) // n_len_bytes
|
|
messages = []
|
|
|
|
for i in range(min(n_instances, 10)): # Limit to first 10 messages
|
|
offset = i * n_len_bytes
|
|
if offset + 2 <= len(payload):
|
|
param_id = struct.unpack('>H', payload[offset:offset+2])[0]
|
|
data_words = []
|
|
|
|
for j in range(word_size):
|
|
word_offset = offset + 2 + (j * 2)
|
|
if word_offset + 2 <= len(payload):
|
|
word = struct.unpack('>H', payload[word_offset:word_offset+2])[0]
|
|
data_words.append(word)
|
|
|
|
messages.append({
|
|
'param_id': param_id,
|
|
'data_words': data_words
|
|
})
|
|
|
|
return {
|
|
'n_message_count': n_instances,
|
|
'n_messages': messages
|
|
}
|
|
|
|
def _parse_d_type(self, payload: bytes, word_size: int) -> Dict[str, Any]:
|
|
"""Parse D-type message payload"""
|
|
if len(payload) < 4:
|
|
return {}
|
|
|
|
d_len_bytes = (word_size + 2) * 2 # ParamID + Delay + data words
|
|
if d_len_bytes <= 0:
|
|
return {}
|
|
|
|
d_instances = len(payload) // d_len_bytes
|
|
messages = []
|
|
|
|
for i in range(min(d_instances, 10)):
|
|
offset = i * d_len_bytes
|
|
if offset + 4 <= len(payload):
|
|
param_id = struct.unpack('>H', payload[offset:offset+2])[0]
|
|
delay = struct.unpack('>H', payload[offset+2:offset+4])[0]
|
|
|
|
data_words = []
|
|
for j in range(word_size):
|
|
word_offset = offset + 4 + (j * 2)
|
|
if word_offset + 2 <= len(payload):
|
|
word = struct.unpack('>H', payload[word_offset:word_offset+2])[0]
|
|
data_words.append(word)
|
|
|
|
messages.append({
|
|
'param_id': param_id,
|
|
'delay': delay,
|
|
'data_words': data_words
|
|
})
|
|
|
|
return {
|
|
'd_message_count': d_instances,
|
|
'd_messages': messages
|
|
}
|
|
|
|
def _parse_mq_type(self, payload: bytes, packet_type: int) -> Dict[str, Any]:
|
|
"""Parse M-type or Q-type message payload"""
|
|
messages = []
|
|
offset = 0
|
|
msg_count = 0
|
|
|
|
while offset < len(payload) - 4 and msg_count < 20: # Limit messages
|
|
try:
|
|
if packet_type == 3: # M-type
|
|
if offset + 6 > len(payload):
|
|
break
|
|
param_id = struct.unpack('>H', payload[offset:offset+2])[0]
|
|
delay = struct.unpack('>H', payload[offset+2:offset+4])[0]
|
|
length = struct.unpack('>H', payload[offset+4:offset+6])[0]
|
|
data_offset = offset + 6
|
|
else: # Q-type
|
|
if offset + 4 > len(payload):
|
|
break
|
|
param_id = struct.unpack('>H', payload[offset:offset+2])[0]
|
|
length = struct.unpack('>H', payload[offset+2:offset+4])[0]
|
|
delay = None
|
|
data_offset = offset + 4
|
|
|
|
# Ensure length is reasonable
|
|
if length > len(payload) - data_offset:
|
|
break
|
|
|
|
msg_data = payload[data_offset:data_offset + length] if length > 0 else b''
|
|
|
|
msg_info = {
|
|
'param_id': param_id,
|
|
'length': length,
|
|
'data': msg_data.hex() if len(msg_data) <= 32 else f"{msg_data[:32].hex()}..."
|
|
}
|
|
|
|
if delay is not None:
|
|
msg_info['delay'] = delay
|
|
|
|
messages.append(msg_info)
|
|
|
|
# Calculate next offset (ensure even alignment)
|
|
next_offset = data_offset + length
|
|
if next_offset % 2 == 1:
|
|
next_offset += 1
|
|
offset = next_offset
|
|
msg_count += 1
|
|
|
|
except:
|
|
break
|
|
|
|
type_key = 'm' if packet_type == 3 else 'q'
|
|
return {
|
|
f'{type_key}_message_count': len(messages),
|
|
f'{type_key}_messages': messages
|
|
} |