Files
StreamLens/analyzer/tui/interface.py

240 lines
9.9 KiB
Python

"""
Main TUI interface controller
"""
import curses
from typing import TYPE_CHECKING
from .navigation import NavigationHandler
from .panels import FlowListPanel, DetailPanel, TimelinePanel
from ..utils.signal_visualizer import signal_visualizer
if TYPE_CHECKING:
from ..analysis.core import EthernetAnalyzer
class TUIInterface:
"""Text User Interface for the analyzer"""
def __init__(self, analyzer: 'EthernetAnalyzer'):
self.analyzer = analyzer
self.navigation = NavigationHandler()
# Initialize panels
self.flow_list_panel = FlowListPanel()
self.detail_panel = DetailPanel()
self.timeline_panel = TimelinePanel()
def run(self, stdscr):
"""Main TUI loop"""
curses.curs_set(0) # Hide cursor
stdscr.keypad(True)
# Set timeout based on whether we're in live mode
if self.analyzer.is_live:
stdscr.timeout(500) # 0.5 second timeout for live updates
else:
stdscr.timeout(1000) # 1 second timeout for static analysis
while True:
stdscr.clear()
if self.navigation.current_view == 'main':
self._draw_main_view(stdscr)
elif self.navigation.current_view == 'dissection':
self._draw_dissection(stdscr)
# Draw status bar
self._draw_status_bar(stdscr)
stdscr.refresh()
# Handle input
key = stdscr.getch()
# Handle timeout (no key pressed) - refresh for live capture
if key == -1 and self.analyzer.is_live:
continue # Just refresh the display
action = self.navigation.handle_input(key, self._get_flows_list())
if action == 'quit':
if self.analyzer.is_live:
self.analyzer.stop_capture = True
break
elif action == 'visualize':
self._handle_visualization()
def _draw_main_view(self, stdscr):
"""Draw three-panel main view: flows list, details, and timeline"""
height, width = stdscr.getmaxyx()
# Calculate panel dimensions based on timeline visibility
if self.navigation.show_timeline:
# Top section: 70% of height, split into left 70% / right 30%
# Bottom section: 30% of height, full width
top_height = int(height * 0.7)
bottom_height = height - top_height - 2 # -2 for separators and status bar
else:
# Use full height for top section when timeline is hidden
top_height = height - 2 # -2 for status bar
bottom_height = 0
left_width = int(width * 0.7) # Increased from 60% to 70% for better IP:port display
right_width = width - left_width - 1 # -1 for separator
# Draw title
stdscr.addstr(0, 0, "=== ETHERNET TRAFFIC ANALYZER ===", curses.A_BOLD)
# Draw summary info
summary = self.analyzer.get_summary()
info_line = f"Packets: {summary['total_packets']} | " \
f"Flows: {summary['unique_flows']} | " \
f"IPs: {summary['unique_ips']}"
# Add real-time statistics if enabled
if self.analyzer.is_live and self.analyzer.statistics_engine.enable_realtime:
rt_summary = self.analyzer.statistics_engine.get_realtime_summary()
info_line += f" | Outliers: {rt_summary.get('total_outliers', 0)}"
stdscr.addstr(1, 0, info_line)
if self.analyzer.is_live:
status_text = "LIVE CAPTURE" if not self.analyzer.statistics_engine.enable_realtime else "LIVE+STATS"
stdscr.addstr(1, left_width - len(status_text) - 2, status_text, curses.A_BLINK)
flows_list = self._get_flows_list()
# Draw left panel (flows list)
self.flow_list_panel.draw(stdscr, 0, 3, left_width, top_height - 3,
flows_list, self.navigation.selected_flow)
# Draw vertical separator for top section
for y in range(1, top_height):
stdscr.addstr(y, left_width, "")
# Draw right panel (details)
self.detail_panel.draw(stdscr, left_width + 2, 1, right_width - 2,
flows_list, self.navigation.selected_flow, top_height - 2)
# Draw timeline panel if enabled
if self.navigation.show_timeline and bottom_height > 0:
# Draw horizontal separator
separator_line = "" * width
stdscr.addstr(top_height, 0, separator_line)
# Draw bottom panel (timeline)
timeline_start_y = top_height + 1
self.timeline_panel.draw(stdscr, 0, timeline_start_y, width, bottom_height,
flows_list, self.navigation.selected_flow)
def _draw_dissection(self, stdscr):
"""Draw frame dissection view"""
stdscr.addstr(0, 0, "=== FRAME DISSECTION ===", curses.A_BOLD)
if not self.analyzer.all_packets:
stdscr.addstr(2, 0, "No packets available")
return
# Show dissection of first few packets
for i, packet in enumerate(self.analyzer.all_packets[:5]):
if i * 6 + 2 >= curses.LINES - 3:
break
dissection = self.analyzer.dissector.dissect_frame(packet, i + 1)
y_start = i * 6 + 2
stdscr.addstr(y_start, 0, f"Frame {dissection['frame_number']}:", curses.A_BOLD)
stdscr.addstr(y_start + 1, 2, f"Timestamp: {dissection['timestamp']:.6f}")
stdscr.addstr(y_start + 2, 2, f"Size: {dissection['size']} bytes")
# Show detected protocols
protocols = dissection.get('protocols', [])
if protocols:
proto_str = ", ".join(protocols)
stdscr.addstr(y_start + 3, 2, f"Protocols: {proto_str}")
layers_str = ", ".join([k for k in dissection['layers'].keys() if not dissection['layers'][k].get('error')])
stdscr.addstr(y_start + 4, 2, f"Layers: {layers_str}")
# Show specialized protocol info
if 'chapter10' in dissection['layers'] and 'data_type_name' in dissection['layers']['chapter10']:
ch10_info = dissection['layers']['chapter10']
stdscr.addstr(y_start + 5, 2, f"CH10: {ch10_info['data_type_name']}")
elif 'ptp' in dissection['layers'] and 'message_type_name' in dissection['layers']['ptp']:
ptp_info = dissection['layers']['ptp']
stdscr.addstr(y_start + 5, 2, f"PTP: {ptp_info['message_type_name']}")
elif 'iena' in dissection['layers'] and 'packet_type_name' in dissection['layers']['iena']:
iena_info = dissection['layers']['iena']
stdscr.addstr(y_start + 5, 2, f"IENA: {iena_info['packet_type_name']}")
elif 'ip' in dissection['layers']:
ip_info = dissection['layers']['ip']
stdscr.addstr(y_start + 5, 2, f"IP: {ip_info['src']} -> {ip_info['dst']}")
def _draw_status_bar(self, stdscr):
"""Draw status bar at bottom"""
height, width = stdscr.getmaxyx()
status_y = height - 1
status = self.navigation.get_status_bar_text()
stdscr.addstr(status_y, 0, status[:width-1], curses.A_REVERSE)
def _get_flows_list(self):
"""Get sorted list of flows - prioritize by largest sigma outlier"""
flows_list = list(self.analyzer.flows.values())
# Sort by maximum sigma deviation first, then by frame count as secondary criterion
flows_list.sort(key=lambda x: (
self.analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
return flows_list
def _handle_visualization(self):
"""Handle Chapter 10 signal visualization for selected flow"""
flows_list = self._get_flows_list()
if not flows_list or self.navigation.selected_flow >= len(flows_list):
return
selected_flow = flows_list[self.navigation.selected_flow]
flow_key = f"{selected_flow.src_ip}->{selected_flow.dst_ip}"
# Check if this flow has Chapter 10 data
if not self.navigation.has_chapter10_data(selected_flow):
return
# Get packets for this flow
flow_packets = self._get_flow_packets(selected_flow)
if not flow_packets:
return
# Launch visualization in TUI mode (will save plots to files)
try:
# Set TUI context to avoid GUI windows
signal_visualizer._in_tui_context = True
# Temporarily show status (will be overwritten by next TUI refresh)
print(f"Generating signal visualization for flow {flow_key}...")
signal_visualizer.visualize_flow_signals(selected_flow, flow_packets, gui_mode=False)
except Exception as e:
# Log error but don't disrupt TUI
print(f"Visualization error: {e}")
pass
def _get_flow_packets(self, flow):
"""Get all packets belonging to a specific flow"""
flow_packets = []
# Iterate through all packets and filter by source/destination
for packet in self.analyzer.all_packets:
try:
# Check if packet matches this flow
if hasattr(packet, 'haslayer'):
from scapy.all import IP
if packet.haslayer(IP):
ip_layer = packet[IP]
if ip_layer.src == flow.src_ip and ip_layer.dst == flow.dst_ip:
flow_packets.append(packet)
except:
continue
return flow_packets