""" Flow Details Panel - Detailed information for selected flow """ from textual.widget import Widget from textual.containers import Vertical from textual.widgets import Static from rich.text import Text from rich.panel import Panel from rich.console import RenderableType, Group from rich.table import Table from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from ....models import FlowStats class FlowDetailsPanel(Vertical): """ Detailed flow information panel Shows: - Flow identification - Enhanced decoder status - Timing analysis - Frame type breakdown - Quality metrics """ DEFAULT_CSS = """ FlowDetailsPanel { height: 1fr; padding: 1; } FlowDetailsPanel Static { margin-bottom: 1; } """ def __init__(self, **kwargs): super().__init__(**kwargs) self.current_flow = None def compose(self): """Create the details panel layout""" yield Static("Flow Details", classes="panel-header") yield Static( Panel("Select a flow to view details", border_style="dim"), id="details-content" ) def update_flow(self, flow: Optional['FlowStats']) -> None: """Update panel with flow details""" self.current_flow = flow content_widget = self.query_one("#details-content", Static) if not flow: content_widget.update( Panel("Select a flow to view details", border_style="dim") ) return # Create detailed content details = self._create_flow_details(flow) content_widget.update(details) def _create_flow_details(self, flow: 'FlowStats') -> RenderableType: """Create comprehensive flow details display""" sections = [] # Flow identification id_table = Table(show_header=False, box=None, padding=0) id_table.add_column(style="dim", width=12) id_table.add_column() id_table.add_row("Source:", f"{flow.src_ip}:{flow.src_port}") id_table.add_row("Destination:", f"{flow.dst_ip}:{flow.dst_port}") id_table.add_row("Protocol:", flow.transport_protocol) id_table.add_row("Packets:", f"{flow.frame_count:,}") id_table.add_row("Volume:", self._format_bytes(flow.total_bytes)) sections.append(Panel(id_table, title="Flow Information", border_style="blue")) # Enhanced analysis if flow.enhanced_analysis.decoder_type != "Standard": enhanced_table = Table(show_header=False, box=None, padding=0) enhanced_table.add_column(style="dim", width=12) enhanced_table.add_column() enhanced_table.add_row("Decoder:", flow.enhanced_analysis.decoder_type) enhanced_table.add_row("Quality:", f"{flow.enhanced_analysis.avg_frame_quality:.1f}%") enhanced_table.add_row("Fields:", str(flow.enhanced_analysis.field_count)) if flow.enhanced_analysis.frame_types: types_str = ", ".join(list(flow.enhanced_analysis.frame_types)[:3]) if len(flow.enhanced_analysis.frame_types) > 3: types_str += f" +{len(flow.enhanced_analysis.frame_types) - 3}" enhanced_table.add_row("Types:", types_str) sections.append(Panel(enhanced_table, title="Enhanced Analysis", border_style="green")) # Timing analysis timing_table = Table(show_header=False, box=None, padding=0) timing_table.add_column(style="dim", width=12) timing_table.add_column() timing_table.add_row("Duration:", f"{flow.duration:.2f}s") timing_table.add_row("Avg Interval:", f"{flow.avg_inter_arrival * 1000:.1f}ms") timing_table.add_row("Jitter:", f"{flow.jitter * 1000:.2f}ms") timing_table.add_row("First Seen:", self._format_timestamp(flow.first_seen)) timing_table.add_row("Last Seen:", self._format_timestamp(flow.last_seen)) sections.append(Panel(timing_table, title="Timing Analysis", border_style="cyan")) # Frame type breakdown (if multiple types) if len(flow.frame_types) > 1: frame_table = Table(show_header=True, box=None) frame_table.add_column("Type", style="blue") frame_table.add_column("Count", justify="right") frame_table.add_column("%", justify="right") total = flow.frame_count for frame_type, stats in sorted( flow.frame_types.items(), key=lambda x: x[1].count, reverse=True )[:5]: percentage = (stats.count / total * 100) if total > 0 else 0 frame_table.add_row( frame_type[:15], f"{stats.count:,}", f"{percentage:.1f}%" ) sections.append(Panel(frame_table, title="Frame Types", border_style="yellow")) # Quality metrics if flow.outlier_frames or flow.enhanced_analysis.decoder_type != "Standard": quality_lines = [] if flow.outlier_frames: outlier_pct = len(flow.outlier_frames) / flow.frame_count * 100 quality_lines.append(f"Outliers: {len(flow.outlier_frames)} ({outlier_pct:.1f}%)") if flow.enhanced_analysis.timing_accuracy: quality_lines.append(f"Timing: {flow.enhanced_analysis.timing_accuracy}") if flow.enhanced_analysis.signal_quality: quality_lines.append(f"Signal: {flow.enhanced_analysis.signal_quality:.1f}%") if quality_lines: quality_text = "\n".join(quality_lines) sections.append(Panel(quality_text, title="Quality Metrics", border_style="magenta")) return Group(*sections) def _format_bytes(self, bytes_count: int) -> str: """Format byte count with units""" if bytes_count >= 1_000_000_000: return f"{bytes_count / 1_000_000_000:.2f} GB" elif bytes_count >= 1_000_000: return f"{bytes_count / 1_000_000:.2f} MB" elif bytes_count >= 1_000: return f"{bytes_count / 1_000:.2f} KB" else: return f"{bytes_count} B" def _format_timestamp(self, timestamp: float) -> str: """Format timestamp for display""" import datetime dt = datetime.datetime.fromtimestamp(timestamp) return dt.strftime("%H:%M:%S.%f")[:-3] # Show milliseconds