""" IENA (Improved Ethernet Network Architecture) dissector for Airbus protocols """ import struct import time from typing import Dict, Optional, Any try: from scapy.all import Packet, UDP, Raw except ImportError: print("Error: scapy library required. Install with: pip install scapy") import sys sys.exit(1) from .base import ProtocolDissector, DissectionResult, ProtocolType class IENADissector(ProtocolDissector): """Airbus IENA (Improved Ethernet Network Architecture) dissector""" IENA_TYPES = { 0: "P-type", 1: "D-type (with delay)", 2: "N-type", 3: "M-type (with delay)", 4: "Q-type" } def __init__(self): self.iena_ports = {50000, 50001} self.lxrs_id = 0xF6AE def can_dissect(self, packet: Packet) -> bool: """Check if packet is IENA""" if not packet.haslayer(UDP): return False udp_layer = packet[UDP] if udp_layer.dport not in self.iena_ports and udp_layer.sport not in self.iena_ports: return False if not packet.haslayer(Raw): return False raw_data = bytes(packet[Raw]) return len(raw_data) >= 14 # Minimum IENA header size def get_protocol_type(self) -> ProtocolType: return ProtocolType.IENA def dissect(self, packet: Packet) -> Optional[DissectionResult]: """Dissect IENA packet""" if not self.can_dissect(packet): return None raw_data = bytes(packet[Raw]) try: header = self._parse_iena_header(raw_data[:14]) result = DissectionResult( protocol=ProtocolType.IENA, fields=header ) # Parse payload based on packet type packet_type = header.get('packet_type', 0) iena_size = header.get('size_in_words', 0) if iena_size > 8 and len(raw_data) >= iena_size * 2: payload_data = raw_data[14:iena_size * 2 - 2] # Exclude trailer payload_info = self._parse_payload(packet_type, payload_data, header) if payload_info: result.fields.update(payload_info) result.payload = payload_data return result except Exception as e: return DissectionResult( protocol=ProtocolType.IENA, fields={}, errors=[f"IENA parsing error: {str(e)}"] ) def _parse_iena_header(self, header_data: bytes) -> Dict[str, Any]: """Parse IENA header (14 bytes)""" if len(header_data) < 14: raise ValueError("IENA header too short") # Unpack header fields (big endian for most fields) key_id = struct.unpack('>H', header_data[0:2])[0] size_words = struct.unpack('>H', header_data[2:4])[0] # Time field is 6 bytes time_bytes = header_data[4:10] time_value = int.from_bytes(time_bytes, 'big') key_status = header_data[10] n2_status = header_data[11] sequence_num = struct.unpack('>H', header_data[12:14])[0] # Parse key status bits is_positional = bool(key_status & 0x80) is_discard = bool(key_status & 0x40) is_msg = bool(key_status & 0x20) has_delay = bool(key_status & 0x10) n4_restriction = bool(key_status & 0x08) word_size = key_status & 0x07 # Determine packet type packet_type = 0 # P-type default if not is_positional and is_msg: packet_type = 3 if has_delay else 4 # M-type or Q-type elif not is_positional and not is_msg: packet_type = 1 if has_delay else 2 # D-type or N-type # Convert time to readable format current_year = time.gmtime().tm_year year_start = time.mktime((current_year, 1, 1, 0, 0, 0, 0, 0, 0)) time_sec = year_start + (time_value / 1000000.0) # IENA time is in microseconds return { 'key_id': key_id, 'size_in_words': size_words, 'time_value': time_value, 'time_readable': time.strftime("%H:%M:%S %d %b %Y", time.gmtime(time_sec)), 'key_status': key_status, 'is_positional': is_positional, 'is_discard': is_discard, 'is_message': is_msg, 'has_delay': has_delay, 'n4_restriction': n4_restriction, 'word_size': word_size, 'n2_status': n2_status, 'sequence_number': sequence_num, 'packet_type': packet_type, 'packet_type_name': self.IENA_TYPES.get(packet_type, "Unknown") } def _parse_payload(self, packet_type: int, payload: bytes, header: Dict) -> Optional[Dict[str, Any]]: """Parse IENA payload based on packet type""" try: word_size = header.get('word_size', 0) if packet_type == 2: # N-type return self._parse_n_type(payload, word_size) elif packet_type == 1: # D-type return self._parse_d_type(payload, word_size) elif packet_type in [3, 4]: # M-type or Q-type return self._parse_mq_type(payload, packet_type) else: # P-type return {'payload_data': payload.hex()} except Exception as e: return {'parse_error': str(e)} def _parse_n_type(self, payload: bytes, word_size: int) -> Dict[str, Any]: """Parse N-type message payload""" if len(payload) < 2: return {} n_len_bytes = (word_size + 1) * 2 if n_len_bytes <= 0: return {} n_instances = len(payload) // n_len_bytes messages = [] for i in range(min(n_instances, 10)): # Limit to first 10 messages offset = i * n_len_bytes if offset + 2 <= len(payload): param_id = struct.unpack('>H', payload[offset:offset+2])[0] data_words = [] for j in range(word_size): word_offset = offset + 2 + (j * 2) if word_offset + 2 <= len(payload): word = struct.unpack('>H', payload[word_offset:word_offset+2])[0] data_words.append(word) messages.append({ 'param_id': param_id, 'data_words': data_words }) return { 'n_message_count': n_instances, 'n_messages': messages } def _parse_d_type(self, payload: bytes, word_size: int) -> Dict[str, Any]: """Parse D-type message payload""" if len(payload) < 4: return {} d_len_bytes = (word_size + 2) * 2 # ParamID + Delay + data words if d_len_bytes <= 0: return {} d_instances = len(payload) // d_len_bytes messages = [] for i in range(min(d_instances, 10)): offset = i * d_len_bytes if offset + 4 <= len(payload): param_id = struct.unpack('>H', payload[offset:offset+2])[0] delay = struct.unpack('>H', payload[offset+2:offset+4])[0] data_words = [] for j in range(word_size): word_offset = offset + 4 + (j * 2) if word_offset + 2 <= len(payload): word = struct.unpack('>H', payload[word_offset:word_offset+2])[0] data_words.append(word) messages.append({ 'param_id': param_id, 'delay': delay, 'data_words': data_words }) return { 'd_message_count': d_instances, 'd_messages': messages } def _parse_mq_type(self, payload: bytes, packet_type: int) -> Dict[str, Any]: """Parse M-type or Q-type message payload""" messages = [] offset = 0 msg_count = 0 while offset < len(payload) - 4 and msg_count < 20: # Limit messages try: if packet_type == 3: # M-type if offset + 6 > len(payload): break param_id = struct.unpack('>H', payload[offset:offset+2])[0] delay = struct.unpack('>H', payload[offset+2:offset+4])[0] length = struct.unpack('>H', payload[offset+4:offset+6])[0] data_offset = offset + 6 else: # Q-type if offset + 4 > len(payload): break param_id = struct.unpack('>H', payload[offset:offset+2])[0] length = struct.unpack('>H', payload[offset+2:offset+4])[0] delay = None data_offset = offset + 4 # Ensure length is reasonable if length > len(payload) - data_offset: break msg_data = payload[data_offset:data_offset + length] if length > 0 else b'' msg_info = { 'param_id': param_id, 'length': length, 'data': msg_data.hex() if len(msg_data) <= 32 else f"{msg_data[:32].hex()}..." } if delay is not None: msg_info['delay'] = delay messages.append(msg_info) # Calculate next offset (ensure even alignment) next_offset = data_offset + length if next_offset % 2 == 1: next_offset += 1 offset = next_offset msg_count += 1 except: break type_key = 'm' if packet_type == 3 else 'q' return { f'{type_key}_message_count': len(messages), f'{type_key}_messages': messages }