""" Main TUI interface controller """ import curses from typing import TYPE_CHECKING from .navigation import NavigationHandler from .panels import FlowListPanel, DetailPanel, TimelinePanel from ..utils.signal_visualizer import signal_visualizer if TYPE_CHECKING: from ..analysis.core import EthernetAnalyzer class TUIInterface: """Text User Interface for the analyzer""" def __init__(self, analyzer: 'EthernetAnalyzer'): self.analyzer = analyzer self.navigation = NavigationHandler() # Initialize panels self.flow_list_panel = FlowListPanel() self.detail_panel = DetailPanel() self.timeline_panel = TimelinePanel() def run(self, stdscr): """Main TUI loop""" curses.curs_set(0) # Hide cursor stdscr.keypad(True) # Set timeout based on whether we're in live mode if self.analyzer.is_live: stdscr.timeout(500) # 0.5 second timeout for live updates else: stdscr.timeout(1000) # 1 second timeout for static analysis while True: stdscr.clear() if self.navigation.current_view == 'main': self._draw_main_view(stdscr) elif self.navigation.current_view == 'dissection': self._draw_dissection(stdscr) # Draw status bar self._draw_status_bar(stdscr) stdscr.refresh() # Handle input key = stdscr.getch() # Handle timeout (no key pressed) - refresh for live capture if key == -1 and self.analyzer.is_live: continue # Just refresh the display action = self.navigation.handle_input(key, self._get_flows_list()) if action == 'quit': if self.analyzer.is_live: self.analyzer.stop_capture = True break elif action == 'visualize': self._handle_visualization() def _draw_main_view(self, stdscr): """Draw three-panel main view: flows list, details, and timeline""" height, width = stdscr.getmaxyx() # Calculate panel dimensions based on timeline visibility if self.navigation.show_timeline: # Top section: 70% of height, split into left 60% / right 40% # Bottom section: 30% of height, full width top_height = int(height * 0.7) bottom_height = height - top_height - 2 # -2 for separators and status bar else: # Use full height for top section when timeline is hidden top_height = height - 2 # -2 for status bar bottom_height = 0 left_width = int(width * 0.6) right_width = width - left_width - 1 # -1 for separator # Draw title stdscr.addstr(0, 0, "=== ETHERNET TRAFFIC ANALYZER ===", curses.A_BOLD) # Draw summary info summary = self.analyzer.get_summary() info_line = f"Packets: {summary['total_packets']} | " \ f"Flows: {summary['unique_flows']} | " \ f"IPs: {summary['unique_ips']}" # Add real-time statistics if enabled if self.analyzer.is_live and self.analyzer.statistics_engine.enable_realtime: rt_summary = self.analyzer.statistics_engine.get_realtime_summary() info_line += f" | Outliers: {rt_summary.get('total_outliers', 0)}" stdscr.addstr(1, 0, info_line) if self.analyzer.is_live: status_text = "LIVE CAPTURE" if not self.analyzer.statistics_engine.enable_realtime else "LIVE+STATS" stdscr.addstr(1, left_width - len(status_text) - 2, status_text, curses.A_BLINK) flows_list = self._get_flows_list() # Draw left panel (flows list) self.flow_list_panel.draw(stdscr, 0, 3, left_width, top_height - 3, flows_list, self.navigation.selected_flow) # Draw vertical separator for top section for y in range(1, top_height): stdscr.addstr(y, left_width, "│") # Draw right panel (details) self.detail_panel.draw(stdscr, left_width + 2, 1, right_width - 2, flows_list, self.navigation.selected_flow, top_height - 2) # Draw timeline panel if enabled if self.navigation.show_timeline and bottom_height > 0: # Draw horizontal separator separator_line = "─" * width stdscr.addstr(top_height, 0, separator_line) # Draw bottom panel (timeline) timeline_start_y = top_height + 1 self.timeline_panel.draw(stdscr, 0, timeline_start_y, width, bottom_height, flows_list, self.navigation.selected_flow) def _draw_dissection(self, stdscr): """Draw frame dissection view""" stdscr.addstr(0, 0, "=== FRAME DISSECTION ===", curses.A_BOLD) if not self.analyzer.all_packets: stdscr.addstr(2, 0, "No packets available") return # Show dissection of first few packets for i, packet in enumerate(self.analyzer.all_packets[:5]): if i * 6 + 2 >= curses.LINES - 3: break dissection = self.analyzer.dissector.dissect_frame(packet, i + 1) y_start = i * 6 + 2 stdscr.addstr(y_start, 0, f"Frame {dissection['frame_number']}:", curses.A_BOLD) stdscr.addstr(y_start + 1, 2, f"Timestamp: {dissection['timestamp']:.6f}") stdscr.addstr(y_start + 2, 2, f"Size: {dissection['size']} bytes") # Show detected protocols protocols = dissection.get('protocols', []) if protocols: proto_str = ", ".join(protocols) stdscr.addstr(y_start + 3, 2, f"Protocols: {proto_str}") layers_str = ", ".join([k for k in dissection['layers'].keys() if not dissection['layers'][k].get('error')]) stdscr.addstr(y_start + 4, 2, f"Layers: {layers_str}") # Show specialized protocol info if 'chapter10' in dissection['layers'] and 'data_type_name' in dissection['layers']['chapter10']: ch10_info = dissection['layers']['chapter10'] stdscr.addstr(y_start + 5, 2, f"CH10: {ch10_info['data_type_name']}") elif 'ptp' in dissection['layers'] and 'message_type_name' in dissection['layers']['ptp']: ptp_info = dissection['layers']['ptp'] stdscr.addstr(y_start + 5, 2, f"PTP: {ptp_info['message_type_name']}") elif 'iena' in dissection['layers'] and 'packet_type_name' in dissection['layers']['iena']: iena_info = dissection['layers']['iena'] stdscr.addstr(y_start + 5, 2, f"IENA: {iena_info['packet_type_name']}") elif 'ip' in dissection['layers']: ip_info = dissection['layers']['ip'] stdscr.addstr(y_start + 5, 2, f"IP: {ip_info['src']} -> {ip_info['dst']}") def _draw_status_bar(self, stdscr): """Draw status bar at bottom""" height, width = stdscr.getmaxyx() status_y = height - 1 status = self.navigation.get_status_bar_text() stdscr.addstr(status_y, 0, status[:width-1], curses.A_REVERSE) def _get_flows_list(self): """Get sorted list of flows - prioritize by largest sigma outlier""" flows_list = list(self.analyzer.flows.values()) # Sort by maximum sigma deviation first, then by frame count as secondary criterion flows_list.sort(key=lambda x: ( self.analyzer.statistics_engine.get_max_sigma_deviation(x), x.frame_count ), reverse=True) return flows_list def _handle_visualization(self): """Handle Chapter 10 signal visualization for selected flow""" flows_list = self._get_flows_list() if not flows_list or self.navigation.selected_flow >= len(flows_list): return selected_flow = flows_list[self.navigation.selected_flow] flow_key = f"{selected_flow.src_ip}->{selected_flow.dst_ip}" # Check if this flow has Chapter 10 data if not self.navigation.has_chapter10_data(selected_flow): return # Get packets for this flow flow_packets = self._get_flow_packets(selected_flow) if not flow_packets: return # Launch visualization in TUI mode (will save plots to files) try: # Set TUI context to avoid GUI windows signal_visualizer._in_tui_context = True # Temporarily show status (will be overwritten by next TUI refresh) print(f"Generating signal visualization for flow {flow_key}...") signal_visualizer.visualize_flow_signals(selected_flow, flow_packets, gui_mode=False) except Exception as e: # Log error but don't disrupt TUI print(f"Visualization error: {e}") pass def _get_flow_packets(self, flow): """Get all packets belonging to a specific flow""" flow_packets = [] # Iterate through all packets and filter by source/destination for packet in self.analyzer.all_packets: try: # Check if packet matches this flow if hasattr(packet, 'haslayer'): from scapy.all import IP if packet.haslayer(IP): ip_layer = packet[IP] if ip_layer.src == flow.src_ip and ip_layer.dst == flow.dst_ip: flow_packets.append(packet) except: continue return flow_packets