""" Ethernet Data decoders for Chapter 10 data types Supports Ethernet Data Formats 0-1 (0x68-0x69) """ import struct from typing import Dict, Any, Optional from .base import DataTypeDecoder, DecodedPayload class EthernetDecoder(DataTypeDecoder): """Decoder for Ethernet Data types (0x68-0x69)""" def __init__(self): super().__init__() self.data_type_base = 0x68 self.data_type_name = "Ethernet Data" self.supported_formats = [0x68, 0x69] def can_decode(self, data_type: int) -> bool: return data_type in [0x68, 0x69] def get_data_type_name(self, data_type: int) -> str: format_names = { 0x68: "Ethernet Data Format 0", 0x69: "Ethernet UDP Payload" } return format_names.get(data_type, f"Ethernet Format {data_type & 0x0F}") def decode(self, payload: bytes, ch10_header: Dict[str, Any]) -> Optional[DecodedPayload]: """Decode Ethernet payload""" data_type = ch10_header.get('data_type', 0) if not self.can_decode(data_type): return None if data_type == 0x68: return self._decode_ethernet_format0(payload, ch10_header) elif data_type == 0x69: return self._decode_ethernet_udp_payload(payload, ch10_header) return None def _decode_ethernet_format0(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: """Decode Ethernet Format 0 (Full Ethernet Frame)""" decoded_data = {} errors = [] # Parse IPH iph = self._parse_intra_packet_header(payload) if iph: decoded_data.update(iph) data_start = iph['data_start'] else: data_start = 0 errors.append("Failed to parse intra-packet header") # Parse Ethernet Format 0 header if data_start + 12 <= len(payload): eth_header = self._safe_unpack('> 28) & 0x3 }) # Decode content type content_types = { 0: "Full MAC frame", 1: "Payload only", 2: "Reserved", 3: "Reserved" } decoded_data['content_type_description'] = content_types.get( decoded_data['content_type'], "Unknown" ) # Extract Ethernet frame data frame_data_start = data_start + 12 frame_length = decoded_data['frame_length'] if frame_data_start + frame_length <= len(payload): frame_data = payload[frame_data_start:frame_data_start + frame_length] # Parse Ethernet header if full MAC frame if decoded_data['content_type'] == 0 and len(frame_data) >= 14: eth_parsed = self._parse_ethernet_header(frame_data) decoded_data.update(eth_parsed) else: decoded_data['raw_frame_data'] = frame_data[:64].hex() # First 64 bytes decoded_data['actual_frame_length'] = len(frame_data) else: errors.append("Frame data extends beyond payload") else: errors.append("Failed to parse Ethernet Format 0 header") else: errors.append("Insufficient data for Ethernet header") return DecodedPayload( data_type=0x68, data_type_name="Ethernet Data Format 0", format_version=0, decoded_data=decoded_data, raw_payload=payload, errors=errors, metadata={'decoder': 'EthernetDecoder'} ) def _decode_ethernet_udp_payload(self, payload: bytes, ch10_header: Dict[str, Any]) -> DecodedPayload: """Decode Ethernet UDP Payload (Format 1)""" decoded_data = {} errors = [] # Parse IPH iph = self._parse_intra_packet_header(payload) if iph: decoded_data.update(iph) data_start = iph['data_start'] else: data_start = 0 errors.append("Failed to parse intra-packet header") # Parse UDP payload header if data_start + 16 <= len(payload): udp_header = self._safe_unpack(' Dict[str, Any]: """Parse Ethernet MAC header""" if len(frame_data) < 14: return {'eth_parse_error': 'Insufficient data for Ethernet header'} # Parse MAC addresses and EtherType dst_mac = frame_data[0:6] src_mac = frame_data[6:12] ethertype = struct.unpack('>H', frame_data[12:14])[0] eth_data = { 'dst_mac': ':'.join(f'{b:02x}' for b in dst_mac), 'src_mac': ':'.join(f'{b:02x}' for b in src_mac), 'ethertype': f'0x{ethertype:04x}', 'ethertype_description': self._decode_ethertype(ethertype) } # Parse payload based on EtherType if ethertype == 0x0800 and len(frame_data) >= 34: # IPv4 ip_data = self._parse_ip_header(frame_data[14:]) eth_data.update(ip_data) elif ethertype == 0x0806 and len(frame_data) >= 42: # ARP arp_data = self._parse_arp_header(frame_data[14:]) eth_data.update(arp_data) return eth_data def _parse_ip_header(self, ip_data: bytes) -> Dict[str, Any]: """Parse IPv4 header""" if len(ip_data) < 20: return {'ip_parse_error': 'Insufficient data for IP header'} version_ihl = ip_data[0] version = (version_ihl >> 4) & 0x0F ihl = version_ihl & 0x0F if version != 4: return {'ip_parse_error': f'Unsupported IP version: {version}'} tos, total_length, identification, flags_fragment = struct.unpack('>BHHH', ip_data[1:9]) ttl, protocol, checksum = struct.unpack('>BBH', ip_data[8:12]) src_ip = struct.unpack('>I', ip_data[12:16])[0] dst_ip = struct.unpack('>I', ip_data[16:20])[0] return { 'ip_version': version, 'ip_header_length': ihl * 4, 'ip_tos': tos, 'ip_total_length': total_length, 'ip_id': identification, 'ip_ttl': ttl, 'ip_protocol': protocol, 'ip_src': self._ip_to_string(src_ip), 'ip_dst': self._ip_to_string(dst_ip), 'ip_protocol_name': self._decode_ip_protocol(protocol) } def _parse_arp_header(self, arp_data: bytes) -> Dict[str, Any]: """Parse ARP header""" if len(arp_data) < 28: return {'arp_parse_error': 'Insufficient data for ARP header'} hw_type, proto_type, hw_len, proto_len, opcode = struct.unpack('>HHBBH', arp_data[0:8]) sender_hw = arp_data[8:14] sender_proto = struct.unpack('>I', arp_data[14:18])[0] target_hw = arp_data[18:24] target_proto = struct.unpack('>I', arp_data[24:28])[0] return { 'arp_hw_type': hw_type, 'arp_proto_type': f'0x{proto_type:04x}', 'arp_opcode': opcode, 'arp_opcode_description': 'Request' if opcode == 1 else 'Reply' if opcode == 2 else f'Unknown ({opcode})', 'arp_sender_hw': ':'.join(f'{b:02x}' for b in sender_hw), 'arp_sender_ip': self._ip_to_string(sender_proto), 'arp_target_hw': ':'.join(f'{b:02x}' for b in target_hw), 'arp_target_ip': self._ip_to_string(target_proto) } def _analyze_udp_payload(self, payload: bytes) -> Dict[str, Any]: """Analyze UDP payload content""" analysis = {} if len(payload) == 0: return {'payload_analysis': 'Empty payload'} # Check for common protocols if len(payload) >= 4: # Check for DNS (port 53 patterns) if payload[0:2] in [b'\x00\x01', b'\x81\x80', b'\x01\x00']: analysis['possible_protocol'] = 'DNS' # Check for DHCP magic cookie elif payload[:4] == b'\x63\x82\x53\x63': analysis['possible_protocol'] = 'DHCP' # Check for RTP (version 2) elif (payload[0] & 0xC0) == 0x80: analysis['possible_protocol'] = 'RTP' else: analysis['possible_protocol'] = 'Unknown' # Basic statistics analysis['payload_entropy'] = self._calculate_entropy(payload[:256]) # First 256 bytes analysis['null_bytes'] = payload.count(0) analysis['printable_chars'] = sum(1 for b in payload[:256] if 32 <= b <= 126) return analysis def _calculate_entropy(self, data: bytes) -> float: """Calculate Shannon entropy of data""" if not data: return 0.0 counts = [0] * 256 for byte in data: counts[byte] += 1 entropy = 0.0 length = len(data) for count in counts: if count > 0: p = count / length entropy -= p * (p.bit_length() - 1) # log2(p) return entropy def _ip_to_string(self, ip_int: int) -> str: """Convert 32-bit integer to IP address string""" return f"{(ip_int >> 24) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 8) & 0xFF}.{ip_int & 0xFF}" def _decode_ethertype(self, ethertype: int) -> str: """Decode EtherType field""" types = { 0x0800: "IPv4", 0x0806: "ARP", 0x86DD: "IPv6", 0x8100: "VLAN", 0x88F7: "PTP" } return types.get(ethertype, f"Unknown (0x{ethertype:04x})") def _decode_ip_protocol(self, protocol: int) -> str: """Decode IP protocol field""" protocols = { 1: "ICMP", 6: "TCP", 17: "UDP", 89: "OSPF", 132: "SCTP" } return protocols.get(protocol, f"Unknown ({protocol})")