186 lines
7.7 KiB
Python
186 lines
7.7 KiB
Python
"""
|
|
Main TUI interface controller
|
|
"""
|
|
|
|
import curses
|
|
from typing import TYPE_CHECKING
|
|
|
|
from .navigation import NavigationHandler
|
|
from .panels import FlowListPanel, DetailPanel, TimelinePanel
|
|
|
|
if TYPE_CHECKING:
|
|
from ..analysis.core import EthernetAnalyzer
|
|
|
|
|
|
class TUIInterface:
|
|
"""Text User Interface for the analyzer"""
|
|
|
|
def __init__(self, analyzer: 'EthernetAnalyzer'):
|
|
self.analyzer = analyzer
|
|
self.navigation = NavigationHandler()
|
|
|
|
# Initialize panels
|
|
self.flow_list_panel = FlowListPanel()
|
|
self.detail_panel = DetailPanel()
|
|
self.timeline_panel = TimelinePanel()
|
|
|
|
def run(self, stdscr):
|
|
"""Main TUI loop"""
|
|
curses.curs_set(0) # Hide cursor
|
|
stdscr.keypad(True)
|
|
|
|
# Set timeout based on whether we're in live mode
|
|
if self.analyzer.is_live:
|
|
stdscr.timeout(500) # 0.5 second timeout for live updates
|
|
else:
|
|
stdscr.timeout(1000) # 1 second timeout for static analysis
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
|
|
if self.navigation.current_view == 'main':
|
|
self._draw_main_view(stdscr)
|
|
elif self.navigation.current_view == 'dissection':
|
|
self._draw_dissection(stdscr)
|
|
|
|
# Draw status bar
|
|
self._draw_status_bar(stdscr)
|
|
|
|
stdscr.refresh()
|
|
|
|
# Handle input
|
|
key = stdscr.getch()
|
|
|
|
# Handle timeout (no key pressed) - refresh for live capture
|
|
if key == -1 and self.analyzer.is_live:
|
|
continue # Just refresh the display
|
|
|
|
action = self.navigation.handle_input(key, self._get_flows_list())
|
|
|
|
if action == 'quit':
|
|
if self.analyzer.is_live:
|
|
self.analyzer.stop_capture = True
|
|
break
|
|
|
|
def _draw_main_view(self, stdscr):
|
|
"""Draw three-panel main view: flows list, details, and timeline"""
|
|
height, width = stdscr.getmaxyx()
|
|
|
|
# Calculate panel dimensions based on timeline visibility
|
|
if self.navigation.show_timeline:
|
|
# Top section: 70% of height, split into left 60% / right 40%
|
|
# Bottom section: 30% of height, full width
|
|
top_height = int(height * 0.7)
|
|
bottom_height = height - top_height - 2 # -2 for separators and status bar
|
|
else:
|
|
# Use full height for top section when timeline is hidden
|
|
top_height = height - 2 # -2 for status bar
|
|
bottom_height = 0
|
|
|
|
left_width = int(width * 0.6)
|
|
right_width = width - left_width - 1 # -1 for separator
|
|
|
|
# Draw title
|
|
stdscr.addstr(0, 0, "=== ETHERNET TRAFFIC ANALYZER ===", curses.A_BOLD)
|
|
|
|
# Draw summary info
|
|
summary = self.analyzer.get_summary()
|
|
info_line = f"Packets: {summary['total_packets']} | " \
|
|
f"Flows: {summary['unique_flows']} | " \
|
|
f"IPs: {summary['unique_ips']}"
|
|
|
|
# Add real-time statistics if enabled
|
|
if self.analyzer.is_live and self.analyzer.statistics_engine.enable_realtime:
|
|
rt_summary = self.analyzer.statistics_engine.get_realtime_summary()
|
|
info_line += f" | Outliers: {rt_summary.get('total_outliers', 0)}"
|
|
|
|
stdscr.addstr(1, 0, info_line)
|
|
|
|
if self.analyzer.is_live:
|
|
status_text = "LIVE CAPTURE" if not self.analyzer.statistics_engine.enable_realtime else "LIVE+STATS"
|
|
stdscr.addstr(1, left_width - len(status_text) - 2, status_text, curses.A_BLINK)
|
|
|
|
flows_list = self._get_flows_list()
|
|
|
|
# Draw left panel (flows list)
|
|
self.flow_list_panel.draw(stdscr, 0, 3, left_width, top_height - 3,
|
|
flows_list, self.navigation.selected_flow)
|
|
|
|
# Draw vertical separator for top section
|
|
for y in range(1, top_height):
|
|
stdscr.addstr(y, left_width, "│")
|
|
|
|
# Draw right panel (details)
|
|
self.detail_panel.draw(stdscr, left_width + 2, 1, right_width - 2,
|
|
flows_list, self.navigation.selected_flow, top_height - 2)
|
|
|
|
# Draw timeline panel if enabled
|
|
if self.navigation.show_timeline and bottom_height > 0:
|
|
# Draw horizontal separator
|
|
separator_line = "─" * width
|
|
stdscr.addstr(top_height, 0, separator_line)
|
|
|
|
# Draw bottom panel (timeline)
|
|
timeline_start_y = top_height + 1
|
|
self.timeline_panel.draw(stdscr, 0, timeline_start_y, width, bottom_height,
|
|
flows_list, self.navigation.selected_flow)
|
|
|
|
def _draw_dissection(self, stdscr):
|
|
"""Draw frame dissection view"""
|
|
stdscr.addstr(0, 0, "=== FRAME DISSECTION ===", curses.A_BOLD)
|
|
|
|
if not self.analyzer.all_packets:
|
|
stdscr.addstr(2, 0, "No packets available")
|
|
return
|
|
|
|
# Show dissection of first few packets
|
|
for i, packet in enumerate(self.analyzer.all_packets[:5]):
|
|
if i * 6 + 2 >= curses.LINES - 3:
|
|
break
|
|
|
|
dissection = self.analyzer.dissector.dissect_frame(packet, i + 1)
|
|
|
|
y_start = i * 6 + 2
|
|
stdscr.addstr(y_start, 0, f"Frame {dissection['frame_number']}:", curses.A_BOLD)
|
|
stdscr.addstr(y_start + 1, 2, f"Timestamp: {dissection['timestamp']:.6f}")
|
|
stdscr.addstr(y_start + 2, 2, f"Size: {dissection['size']} bytes")
|
|
|
|
# Show detected protocols
|
|
protocols = dissection.get('protocols', [])
|
|
if protocols:
|
|
proto_str = ", ".join(protocols)
|
|
stdscr.addstr(y_start + 3, 2, f"Protocols: {proto_str}")
|
|
|
|
layers_str = ", ".join([k for k in dissection['layers'].keys() if not dissection['layers'][k].get('error')])
|
|
stdscr.addstr(y_start + 4, 2, f"Layers: {layers_str}")
|
|
|
|
# Show specialized protocol info
|
|
if 'chapter10' in dissection['layers'] and 'data_type_name' in dissection['layers']['chapter10']:
|
|
ch10_info = dissection['layers']['chapter10']
|
|
stdscr.addstr(y_start + 5, 2, f"CH10: {ch10_info['data_type_name']}")
|
|
elif 'ptp' in dissection['layers'] and 'message_type_name' in dissection['layers']['ptp']:
|
|
ptp_info = dissection['layers']['ptp']
|
|
stdscr.addstr(y_start + 5, 2, f"PTP: {ptp_info['message_type_name']}")
|
|
elif 'iena' in dissection['layers'] and 'packet_type_name' in dissection['layers']['iena']:
|
|
iena_info = dissection['layers']['iena']
|
|
stdscr.addstr(y_start + 5, 2, f"IENA: {iena_info['packet_type_name']}")
|
|
elif 'ip' in dissection['layers']:
|
|
ip_info = dissection['layers']['ip']
|
|
stdscr.addstr(y_start + 5, 2, f"IP: {ip_info['src']} -> {ip_info['dst']}")
|
|
|
|
def _draw_status_bar(self, stdscr):
|
|
"""Draw status bar at bottom"""
|
|
height, width = stdscr.getmaxyx()
|
|
status_y = height - 1
|
|
status = self.navigation.get_status_bar_text()
|
|
stdscr.addstr(status_y, 0, status[:width-1], curses.A_REVERSE)
|
|
|
|
def _get_flows_list(self):
|
|
"""Get sorted list of flows - prioritize by largest sigma outlier"""
|
|
flows_list = list(self.analyzer.flows.values())
|
|
# Sort by maximum sigma deviation first, then by frame count as secondary criterion
|
|
flows_list.sort(key=lambda x: (
|
|
self.analyzer.statistics_engine.get_max_sigma_deviation(x),
|
|
x.frame_count
|
|
), reverse=True)
|
|
return flows_list |