""" Packet Decoder View - Deep protocol inspection and field extraction Focuses on understanding packet contents and protocol compliance """ import curses from typing import TYPE_CHECKING, List, Optional, Dict, Any from ...models import FlowStats if TYPE_CHECKING: from ...analysis.core import EthernetAnalyzer class PacketDecoderView: """ Packet Decoder View - F2 Deep packet inspection interface: - Protocol field extraction and display - Frame-by-frame analysis - Enhanced decoder output - Field value inspection """ def __init__(self, analyzer: 'EthernetAnalyzer'): self.analyzer = analyzer self.selected_flow = 0 self.selected_frame = 0 self.selected_field = 0 self.scroll_offset = 0 self.field_scroll_offset = 0 self.current_panel = 0 # 0=flows, 1=frames, 2=fields def draw(self, stdscr, selected_flow_key: Optional[str]): """Draw the Packet Decoder view""" height, width = stdscr.getmaxyx() start_y = 3 max_height = height - 2 flows_list = self._get_enhanced_flows() if not flows_list: stdscr.addstr(start_y + 2, 4, "No enhanced decodable flows detected", curses.A_DIM) stdscr.addstr(start_y + 3, 4, "Switch to Flow Analysis (F1) to see all flows", curses.A_DIM) return # Decoder view header stdscr.addstr(start_y, 4, "PACKET DECODER - Enhanced Protocol Analysis", curses.A_BOLD) # Three-panel layout: Flows | Frames | Fields panel_width = width // 3 # Panel 1: Enhanced Flows (left) self._draw_flows_panel(stdscr, start_y + 2, 4, panel_width - 2, max_height - start_y - 2, flows_list) # Separator for y in range(start_y + 2, max_height): stdscr.addstr(y, panel_width, "│", curses.A_DIM) # Panel 2: Frame Details (center) selected_flow = flows_list[self.selected_flow] if flows_list else None self._draw_frames_panel(stdscr, start_y + 2, panel_width + 2, panel_width - 2, max_height - start_y - 2, selected_flow) # Separator for y in range(start_y + 2, max_height): stdscr.addstr(y, 2 * panel_width, "│", curses.A_DIM) # Panel 3: Field Inspection (right) self._draw_fields_panel(stdscr, start_y + 2, 2 * panel_width + 2, panel_width - 2, max_height - start_y - 2, selected_flow) def _draw_flows_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flows_list: List[FlowStats]): """Draw enhanced flows panel""" panel_attr = curses.A_BOLD if self.current_panel == 0 else curses.A_NORMAL # Panel header header = "Enhanced Flows" stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) current_y = start_y + 2 available_height = height - 2 # Flow list for i, flow in enumerate(flows_list[:available_height]): is_selected = (i == self.selected_flow and self.current_panel == 0) attr = curses.A_REVERSE if is_selected else curses.A_NORMAL # Flow summary line flow_line = self._format_flow_summary(flow, width - 2) stdscr.addstr(current_y + i, start_x, flow_line, attr) # Decoder indicator decoder_indicator = "●" if flow.enhanced_analysis.decoder_type != "Standard" else "○" stdscr.addstr(current_y + i, start_x + width - 3, decoder_indicator, curses.A_BOLD if flow.enhanced_analysis.decoder_type != "Standard" else curses.A_DIM) def _draw_frames_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flow: Optional[FlowStats]): """Draw frame details panel""" panel_attr = curses.A_BOLD if self.current_panel == 1 else curses.A_NORMAL # Panel header header = "Frame Analysis" stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) if not flow: stdscr.addstr(start_y + 2, start_x, "No flow selected", curses.A_DIM) return current_y = start_y + 2 # Flow details stdscr.addstr(current_y, start_x, f"Flow: {flow.src_ip} → {flow.dst_ip}", curses.A_BOLD) current_y += 1 stdscr.addstr(current_y, start_x, f"Decoder: {flow.enhanced_analysis.decoder_type}") current_y += 2 # Sample frames if flow.enhanced_analysis.sample_decoded_fields: stdscr.addstr(current_y, start_x, "Decoded Frames:", curses.A_UNDERLINE) current_y += 1 for i, (frame_key, frame_data) in enumerate(flow.enhanced_analysis.sample_decoded_fields.items()): is_selected = (i == self.selected_frame and self.current_panel == 1) attr = curses.A_REVERSE if is_selected else curses.A_NORMAL frame_line = f"{frame_key}: {len(frame_data)} fields" stdscr.addstr(current_y + i, start_x, frame_line[:width-2], attr) else: stdscr.addstr(current_y, start_x, "No decoded frame data available", curses.A_DIM) current_y += 1 stdscr.addstr(current_y, start_x, "Decoder may not be implemented", curses.A_DIM) def _draw_fields_panel(self, stdscr, start_y: int, start_x: int, width: int, height: int, flow: Optional[FlowStats]): """Draw field inspection panel""" panel_attr = curses.A_BOLD if self.current_panel == 2 else curses.A_NORMAL # Panel header header = "Field Inspector" stdscr.addstr(start_y, start_x, header, panel_attr | curses.A_UNDERLINE) if not flow or not flow.enhanced_analysis.sample_decoded_fields: stdscr.addstr(start_y + 2, start_x, "No field data available", curses.A_DIM) return current_y = start_y + 2 # Get selected frame data frame_items = list(flow.enhanced_analysis.sample_decoded_fields.items()) if self.selected_frame < len(frame_items): frame_key, frame_data = frame_items[self.selected_frame] stdscr.addstr(current_y, start_x, f"Frame: {frame_key}", curses.A_BOLD) current_y += 2 # Field list with values available_height = height - 4 field_items = list(frame_data.items()) start_field = self.field_scroll_offset end_field = min(start_field + available_height, len(field_items)) for i in range(start_field, end_field): field_name, field_value = field_items[i] display_idx = i - start_field is_selected = (i == self.selected_field and self.current_panel == 2) attr = curses.A_REVERSE if is_selected else curses.A_NORMAL # Format field line field_line = self._format_field_line(field_name, field_value, width - 2) stdscr.addstr(current_y + display_idx, start_x, field_line, attr) # Scroll indicators if start_field > 0: stdscr.addstr(current_y, start_x + width - 5, "↑", curses.A_DIM) if end_field < len(field_items): stdscr.addstr(current_y + available_height - 1, start_x + width - 5, "↓", curses.A_DIM) def _format_flow_summary(self, flow: FlowStats, max_width: int) -> str: """Format flow summary for flows panel""" # Format as "src:port → dst:port | protocol" source = f"{flow.src_ip}:{flow.src_port}" destination = f"{flow.dst_ip}:{flow.dst_port}" protocol = flow.enhanced_analysis.decoder_type if protocol == "Standard": protocol = self._get_primary_protocol(flow) # Calculate available space for src and dst protocol_space = len(protocol) + 3 # " | " + protocol available_space = max_width - protocol_space src_space = available_space // 2 - 2 # Account for " → " dst_space = available_space - src_space - 3 # " → " if len(source) > src_space: source = f"{flow.src_ip[:src_space-6]}…:{flow.src_port}" if len(destination) > dst_space: destination = f"{flow.dst_ip[:dst_space-6]}…:{flow.dst_port}" return f"{source} → {destination} | {protocol}"[:max_width] def _format_field_line(self, field_name: str, field_value: Any, max_width: int) -> str: """Format field name and value for display""" # Clean up field name display_name = field_name.replace('_', ' ').title() # Format value based on type if isinstance(field_value, bool): display_value = "Yes" if field_value else "No" elif isinstance(field_value, float): if "timestamp" in field_name.lower(): display_value = f"{field_value:.6f}s" else: display_value = f"{field_value:.3f}" elif field_value is None: display_value = "N/A" else: display_value = str(field_value) # Truncate if needed available_name_width = max_width // 2 available_value_width = max_width - available_name_width - 3 if len(display_name) > available_name_width: display_name = display_name[:available_name_width-1] + "…" if len(display_value) > available_value_width: display_value = display_value[:available_value_width-1] + "…" return f"{display_name:<{available_name_width}} : {display_value}" def _get_enhanced_flows(self) -> List[FlowStats]: """Get flows with enhanced decoders available""" flows_list = [] for flow in self.analyzer.flows.values(): if (flow.enhanced_analysis.decoder_type != "Standard" or "CHAPTER10" in flow.detected_protocol_types or "PTP" in flow.detected_protocol_types or "IENA" in flow.detected_protocol_types): flows_list.append(flow) # Sort by decoder quality and packet count flows_list.sort(key=lambda x: ( x.enhanced_analysis.decoder_type != "Standard", len(x.enhanced_analysis.sample_decoded_fields), x.frame_count ), reverse=True) return flows_list def _get_primary_protocol(self, flow: FlowStats) -> str: """Get primary protocol for display""" if flow.detected_protocol_types: enhanced_protocols = {'CHAPTER10', 'PTP', 'IENA', 'CH10'} found_enhanced = flow.detected_protocol_types & enhanced_protocols if found_enhanced: return list(found_enhanced)[0] return list(flow.detected_protocol_types)[0] return flow.transport_protocol def handle_input(self, key: int, flows_list: List[FlowStats]) -> str: """Handle input for Packet Decoder view""" enhanced_flows = self._get_enhanced_flows() if key == ord('\t'): # Tab to switch panels self.current_panel = (self.current_panel + 1) % 3 return 'panel_switch' elif key == curses.KEY_UP: if self.current_panel == 0: # Flows panel self.selected_flow = max(0, self.selected_flow - 1) elif self.current_panel == 1: # Frames panel self.selected_frame = max(0, self.selected_frame - 1) elif self.current_panel == 2: # Fields panel self.selected_field = max(0, self.selected_field - 1) self._adjust_field_scroll() return 'selection_change' elif key == curses.KEY_DOWN: if self.current_panel == 0: # Flows panel self.selected_flow = min(len(enhanced_flows) - 1, self.selected_flow + 1) elif self.current_panel == 1: # Frames panel if enhanced_flows and self.selected_flow < len(enhanced_flows): flow = enhanced_flows[self.selected_flow] max_frames = len(flow.enhanced_analysis.sample_decoded_fields) - 1 self.selected_frame = min(max_frames, self.selected_frame + 1) elif self.current_panel == 2: # Fields panel if enhanced_flows and self.selected_flow < len(enhanced_flows): flow = enhanced_flows[self.selected_flow] if flow.enhanced_analysis.sample_decoded_fields: frame_items = list(flow.enhanced_analysis.sample_decoded_fields.items()) if self.selected_frame < len(frame_items): frame_data = frame_items[self.selected_frame][1] max_fields = len(frame_data) - 1 self.selected_field = min(max_fields, self.selected_field + 1) self._adjust_field_scroll() return 'selection_change' elif key == ord('e') or key == ord('E'): return 'export_fields' elif key == ord('c') or key == ord('C'): return 'copy_field' return 'none' def _adjust_field_scroll(self): """Adjust field scroll to keep selected field visible""" # Simple scroll adjustment - could be enhanced if self.selected_field < self.field_scroll_offset: self.field_scroll_offset = self.selected_field elif self.selected_field >= self.field_scroll_offset + 10: # Assume 10 visible fields self.field_scroll_offset = self.selected_field - 9