""" StreamLens Textual Application V2 - TipTop-Inspired Design Modern TUI with real-time metrics, sparklines, and professional monitoring aesthetic """ from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical, ScrollableContainer from textual.widgets import Header, Footer, Static, DataTable, Label from textual.reactive import reactive from textual.timer import Timer from textual.events import MouseDown, MouseMove from typing import TYPE_CHECKING from rich.text import Text from rich.console import Group from rich.panel import Panel from rich.table import Table import time from .widgets.sparkline import SparklineWidget from .widgets.metric_card import MetricCard from .widgets.flow_table_v2 import EnhancedFlowTable from .widgets.split_flow_details import FlowMainDetailsPanel, SubFlowDetailsPanel if TYPE_CHECKING: from ...analysis.core import EthernetAnalyzer class StreamLensAppV2(App): """ StreamLens TipTop-Inspired Interface Features: - Real-time metrics with sparklines - Color-coded quality indicators - Compact information display - Multi-column layout - Smooth live updates """ CSS_PATH = "styles/streamlens_v2.tcss" ENABLE_COMMAND_PALETTE = False AUTO_FOCUS = None BINDINGS = [ ("q", "quit", "Quit"), ("1", "sort('flows')", "Sort Flows"), ("2", "sort('packets')", "Sort Packets"), ("3", "sort('volume')", "Sort Volume"), ("4", "sort('quality')", "Sort Quality"), ("p", "toggle_pause", "Pause"), ("d", "show_details", "Details"), ("?", "toggle_help", "Help"), ] # Reactive attributes total_flows = reactive(0) total_packets = reactive(0) packets_per_sec = reactive(0.0) bytes_per_sec = reactive(0.0) enhanced_flows = reactive(0) outlier_count = reactive(0) # Update timers metric_timer: Timer = None flow_timer: Timer = None def __init__(self, analyzer: 'EthernetAnalyzer'): super().__init__() self.analyzer = analyzer self.title = "StreamLens" self.sub_title = "Network Flow Analysis" self.paused = False # Metrics history for sparklines self.packets_history = [] self.bytes_history = [] self.flows_history = [] self.max_history = 60 # 60 seconds of history def compose(self) -> ComposeResult: """Create TipTop-inspired layout""" yield Header() with Container(id="main-container"): # Ultra-compact metrics bar with Horizontal(id="metrics-bar"): yield MetricCard("Flows", f"{self.total_flows}", id="flows-metric") yield MetricCard("Pkts/s", f"{self.packets_per_sec:.0f}", id="packets-metric") yield MetricCard("Vol/s", self._format_bytes_per_sec(self.bytes_per_sec), id="volume-metric") yield MetricCard("Enhanced", f"{self.enhanced_flows}", color="success", id="enhanced-metric") yield MetricCard("Outliers", f"{self.outlier_count}", color="warning" if self.outlier_count > 0 else "normal", id="outliers-metric") # Main content area with 3 clean panels with Horizontal(id="content-area"): # Left - Enhanced flow table yield EnhancedFlowTable( self.analyzer, id="flow-table", classes="panel-wide" ) # Right top - Main flow details with Vertical(id="right-panels"): yield FlowMainDetailsPanel(id="main-flow-details") yield SubFlowDetailsPanel(id="sub-flow-details") yield Footer() def on_mount(self) -> None: """Initialize the application with TipTop-style updates""" self.update_metrics() # Set up update intervals like TipTop self.metric_timer = self.set_interval(0.5, self.update_metrics) # 2Hz for smooth graphs self.flow_timer = self.set_interval(1.0, self.update_flows) # 1Hz for flow data # Initialize sparkline history self._initialize_history() # Set initial focus to the flow table for immediate keyboard navigation self.call_after_refresh(self._set_initial_focus) def _set_initial_focus(self): """Set initial focus to the flow table after widgets are ready""" try: flow_table = self.query_one("#flow-table", EnhancedFlowTable) data_table = flow_table.query_one("#flows-data-table", DataTable) data_table.focus() except Exception: # If table isn't ready yet, try again after a short delay self.set_timer(0.1, self._set_initial_focus) def _initialize_history(self): """Initialize metrics history arrays""" current_time = time.time() for _ in range(self.max_history): self.packets_history.append(0) self.bytes_history.append(0) self.flows_history.append(0) def update_metrics(self) -> None: """Update real-time metrics and sparklines""" if self.paused: return # Get current metrics summary = self.analyzer.get_summary() self.total_flows = summary.get('unique_flows', 0) self.total_packets = summary.get('total_packets', 0) # Calculate rates (simplified for now) # In real implementation, track deltas over time current_time = time.time() if not hasattr(self, '_start_time'): self._start_time = current_time elapsed = max(1, current_time - self._start_time) self.packets_per_sec = self.total_packets / elapsed self.bytes_per_sec = summary.get('total_bytes', 0) / elapsed # Count enhanced and outliers enhanced = 0 outliers = 0 for flow in self.analyzer.flows.values(): if flow.enhanced_analysis.decoder_type != "Standard": enhanced += 1 outliers += len(flow.outlier_frames) self.enhanced_flows = enhanced self.outlier_count = outliers # Update metric cards self._update_metric_cards() # Update sparklines (removed - no longer in left panel) # self._update_sparklines() def _update_metric_cards(self): """Update the metric card displays""" # Update flows metric flows_card = self.query_one("#flows-metric", MetricCard) flows_card.update_value(f"{self.total_flows}") # Update packets/s with color coding packets_card = self.query_one("#packets-metric", MetricCard) packets_card.update_value(f"{self.packets_per_sec:.1f}") if self.packets_per_sec > 10000: packets_card.color = "warning" elif self.packets_per_sec > 50000: packets_card.color = "error" else: packets_card.color = "success" # Update volume/s volume_card = self.query_one("#volume-metric", MetricCard) volume_card.update_value(self._format_bytes_per_sec(self.bytes_per_sec)) # Update enhanced flows enhanced_card = self.query_one("#enhanced-metric", MetricCard) enhanced_card.update_value(f"{self.enhanced_flows}") # Update outliers with color outliers_card = self.query_one("#outliers-metric", MetricCard) outliers_card.update_value(f"{self.outlier_count}") if self.outlier_count > 100: outliers_card.color = "error" elif self.outlier_count > 10: outliers_card.color = "warning" else: outliers_card.color = "normal" def _update_sparklines(self): """Update sparkline charts with latest data""" # Add new data points self.packets_history.append(self.packets_per_sec) self.bytes_history.append(self.bytes_per_sec) self.flows_history.append(self.total_flows) # Keep only recent history if len(self.packets_history) > self.max_history: self.packets_history.pop(0) self.bytes_history.pop(0) self.flows_history.pop(0) # Update sparkline widgets flow_spark = self.query_one("#flow-rate-spark", SparklineWidget) flow_spark.update_data(self.flows_history) packet_spark = self.query_one("#packet-rate-spark", SparklineWidget) packet_spark.update_data(self.packets_history) def update_flows(self) -> None: """Update flow table data""" if self.paused: return # Update flow table flow_table = self.query_one("#flow-table", EnhancedFlowTable) flow_table.refresh_data() def on_enhanced_flow_table_flow_selected(self, event: EnhancedFlowTable.FlowSelected) -> None: """Handle flow selection events""" if event.flow: # Update main flow details panel main_panel = self.query_one("#main-flow-details", FlowMainDetailsPanel) main_panel.update_flow(event.flow) # Update sub-flow details panel sub_panel = self.query_one("#sub-flow-details", SubFlowDetailsPanel) sub_panel.update_flow(event.flow, event.subflow_type) def _format_bytes_per_sec(self, bps: float) -> str: """Format bytes per second with appropriate units""" if bps >= 1_000_000_000: return f"{bps / 1_000_000_000:.1f} GB/s" elif bps >= 1_000_000: return f"{bps / 1_000_000:.1f} MB/s" elif bps >= 1_000: return f"{bps / 1_000:.1f} KB/s" else: return f"{bps:.0f} B/s" def action_toggle_pause(self) -> None: """Toggle pause state""" self.paused = not self.paused status = "PAUSED" if self.paused else "LIVE" self.sub_title = f"Network Flow Analysis - {status}" def action_sort(self, key: str) -> None: """Sort flow table by specified key""" flow_table = self.query_one("#flow-table", EnhancedFlowTable) flow_table.sort_by(key) def action_show_details(self) -> None: """Show detailed view for selected flow""" # TODO: Implement detailed flow modal pass def on_mouse_down(self, event: MouseDown) -> None: """Prevent default mouse down behavior to disable mouse interaction.""" event.prevent_default() def on_mouse_move(self, event: MouseMove) -> None: """Prevent default mouse move behavior to disable mouse interaction.""" event.prevent_default()