127 lines
5.5 KiB
Python
127 lines
5.5 KiB
Python
|
|
"""
|
||
|
|
Left panel - Flow list with frame type breakdowns
|
||
|
|
"""
|
||
|
|
|
||
|
|
from typing import List, Optional
|
||
|
|
import curses
|
||
|
|
|
||
|
|
from ...models import FlowStats
|
||
|
|
|
||
|
|
|
||
|
|
class FlowListPanel:
|
||
|
|
"""Left panel showing flows and frame type breakdowns"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.selected_item = 0
|
||
|
|
self.scroll_offset = 0
|
||
|
|
|
||
|
|
def draw(self, stdscr, x_offset: int, y_offset: int, width: int, height: int,
|
||
|
|
flows_list: List[FlowStats], selected_flow: int):
|
||
|
|
"""Draw the flow list panel"""
|
||
|
|
|
||
|
|
# Draw flows table header
|
||
|
|
stdscr.addstr(y_offset, x_offset, "FLOWS:", curses.A_BOLD)
|
||
|
|
headers = f"{'Source IP':15} {'Dest IP':15} {'Pkts':5} {'Protocol':18} {'ΔT Avg':10} {'Out':4}"
|
||
|
|
stdscr.addstr(y_offset + 1, x_offset, headers[:width-1], curses.A_UNDERLINE)
|
||
|
|
|
||
|
|
# Calculate scrolling parameters
|
||
|
|
start_row = y_offset + 2
|
||
|
|
max_rows = height - 3 # Account for header and title
|
||
|
|
total_items = self._get_total_display_items(flows_list)
|
||
|
|
|
||
|
|
# Calculate scroll offset to keep selected item visible
|
||
|
|
scroll_offset = self._calculate_scroll_offset(selected_flow, max_rows, total_items)
|
||
|
|
|
||
|
|
# Draw flows list with frame type breakdowns
|
||
|
|
current_row = start_row
|
||
|
|
display_item = 0 # Track selectable items (flows + frame types)
|
||
|
|
visible_items = 0 # Track items actually drawn
|
||
|
|
|
||
|
|
for flow_idx, flow in enumerate(flows_list):
|
||
|
|
# Check if main flow line should be displayed
|
||
|
|
if display_item >= scroll_offset and visible_items < max_rows:
|
||
|
|
# Draw main flow line
|
||
|
|
protocol_str = self._get_protocol_display(flow)
|
||
|
|
avg_time = f"{flow.avg_inter_arrival:.3f}s" if flow.avg_inter_arrival > 0 else "N/A"
|
||
|
|
|
||
|
|
line = f"{flow.src_ip:15} {flow.dst_ip:15} {flow.frame_count:5} {protocol_str:18} {avg_time:10} {'':4}"
|
||
|
|
|
||
|
|
if display_item == selected_flow:
|
||
|
|
stdscr.addstr(current_row, x_offset, line[:width-1], curses.A_REVERSE)
|
||
|
|
else:
|
||
|
|
stdscr.addstr(current_row, x_offset, line[:width-1], curses.A_BOLD)
|
||
|
|
|
||
|
|
current_row += 1
|
||
|
|
visible_items += 1
|
||
|
|
|
||
|
|
display_item += 1
|
||
|
|
|
||
|
|
# Draw frame type breakdowns for this flow
|
||
|
|
if flow.frame_types:
|
||
|
|
sorted_frame_types = sorted(flow.frame_types.items(), key=lambda x: x[1].count, reverse=True)
|
||
|
|
|
||
|
|
for frame_type, ft_stats in sorted_frame_types:
|
||
|
|
if display_item >= scroll_offset and visible_items < max_rows:
|
||
|
|
# Calculate frame type timing display
|
||
|
|
ft_avg = f"{ft_stats.avg_inter_arrival:.3f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
|
||
|
|
outlier_count = len(ft_stats.outlier_details) if ft_stats.outlier_details else 0
|
||
|
|
|
||
|
|
# Create frame type line aligned with columns
|
||
|
|
ft_line = f"{'':15} {'':15} {ft_stats.count:5} {frame_type:18} {ft_avg:10} {outlier_count:4}"
|
||
|
|
|
||
|
|
if display_item == selected_flow:
|
||
|
|
stdscr.addstr(current_row, x_offset, ft_line[:width-1], curses.A_REVERSE)
|
||
|
|
else:
|
||
|
|
stdscr.addstr(current_row, x_offset, ft_line[:width-1])
|
||
|
|
|
||
|
|
current_row += 1
|
||
|
|
visible_items += 1
|
||
|
|
|
||
|
|
display_item += 1
|
||
|
|
|
||
|
|
def _get_protocol_display(self, flow: FlowStats) -> str:
|
||
|
|
"""Get display string for flow protocols"""
|
||
|
|
if flow.detected_protocol_types:
|
||
|
|
# Prioritize specialized protocols
|
||
|
|
specialized = {'CHAPTER10', 'PTP', 'IENA'}
|
||
|
|
found_specialized = flow.detected_protocol_types & specialized
|
||
|
|
if found_specialized:
|
||
|
|
return list(found_specialized)[0]
|
||
|
|
|
||
|
|
# Use first detected protocol type
|
||
|
|
return list(flow.detected_protocol_types)[0]
|
||
|
|
|
||
|
|
# Fallback to basic protocols
|
||
|
|
if flow.protocols:
|
||
|
|
return list(flow.protocols)[0]
|
||
|
|
|
||
|
|
return "Unknown"
|
||
|
|
|
||
|
|
def _get_total_display_items(self, flows_list: List[FlowStats]) -> int:
|
||
|
|
"""Calculate total number of selectable items (flows + frame types)"""
|
||
|
|
total = 0
|
||
|
|
for flow in flows_list:
|
||
|
|
total += 1 # Flow itself
|
||
|
|
total += len(flow.frame_types) # Frame types under this flow
|
||
|
|
return total
|
||
|
|
|
||
|
|
def _calculate_scroll_offset(self, selected_item: int, max_visible: int, total_items: int) -> int:
|
||
|
|
"""Calculate scroll offset to keep selected item visible"""
|
||
|
|
if total_items <= max_visible:
|
||
|
|
return 0 # No scrolling needed
|
||
|
|
|
||
|
|
# Keep selected item in the middle third of visible area when possible
|
||
|
|
middle_position = max_visible // 3
|
||
|
|
|
||
|
|
# Calculate ideal scroll offset
|
||
|
|
scroll_offset = max(0, selected_item - middle_position)
|
||
|
|
|
||
|
|
# Ensure we don't scroll past the end
|
||
|
|
max_scroll = max(0, total_items - max_visible)
|
||
|
|
scroll_offset = min(scroll_offset, max_scroll)
|
||
|
|
|
||
|
|
return scroll_offset
|
||
|
|
|
||
|
|
def get_total_display_items(self, flows_list: List[FlowStats]) -> int:
|
||
|
|
"""Public method to get total display items"""
|
||
|
|
return self._get_total_display_items(flows_list)
|