Files
StreamLens/analyzer/tui/textual/widgets/flow_details.py

173 lines
6.6 KiB
Python
Raw Normal View History

"""
Flow Details Panel - Detailed information for selected flow
"""
from textual.widget import Widget
from textual.containers import Vertical
from textual.widgets import Static
from rich.text import Text
from rich.panel import Panel
from rich.console import RenderableType, Group
from rich.table import Table
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from ....models import FlowStats
class FlowDetailsPanel(Vertical):
"""
Detailed flow information panel
Shows:
- Flow identification
- Enhanced decoder status
- Timing analysis
- Frame type breakdown
- Quality metrics
"""
DEFAULT_CSS = """
FlowDetailsPanel {
height: 1fr;
padding: 1;
}
FlowDetailsPanel Static {
margin-bottom: 1;
}
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.current_flow = None
def compose(self):
"""Create the details panel layout"""
yield Static("Flow Details", classes="panel-header")
yield Static(
Panel("Select a flow to view details", border_style="dim"),
id="details-content"
)
def update_flow(self, flow: Optional['FlowStats']) -> None:
"""Update panel with flow details"""
self.current_flow = flow
content_widget = self.query_one("#details-content", Static)
if not flow:
content_widget.update(
Panel("Select a flow to view details", border_style="dim")
)
return
# Create detailed content
details = self._create_flow_details(flow)
content_widget.update(details)
def _create_flow_details(self, flow: 'FlowStats') -> RenderableType:
"""Create comprehensive flow details display"""
sections = []
# Flow identification
id_table = Table(show_header=False, box=None, padding=0)
id_table.add_column(style="dim", width=12)
id_table.add_column()
id_table.add_row("Source:", f"{flow.src_ip}:{flow.src_port}")
id_table.add_row("Destination:", f"{flow.dst_ip}:{flow.dst_port}")
id_table.add_row("Protocol:", flow.transport_protocol)
id_table.add_row("Packets:", f"{flow.frame_count:,}")
id_table.add_row("Volume:", self._format_bytes(flow.total_bytes))
sections.append(Panel(id_table, title="Flow Information", border_style="blue"))
# Enhanced analysis
if flow.enhanced_analysis.decoder_type != "Standard":
enhanced_table = Table(show_header=False, box=None, padding=0)
enhanced_table.add_column(style="dim", width=12)
enhanced_table.add_column()
enhanced_table.add_row("Decoder:", flow.enhanced_analysis.decoder_type)
enhanced_table.add_row("Quality:", f"{flow.enhanced_analysis.avg_frame_quality:.1f}%")
enhanced_table.add_row("Fields:", str(flow.enhanced_analysis.field_count))
if flow.enhanced_analysis.frame_types:
types_str = ", ".join(list(flow.enhanced_analysis.frame_types)[:3])
if len(flow.enhanced_analysis.frame_types) > 3:
types_str += f" +{len(flow.enhanced_analysis.frame_types) - 3}"
enhanced_table.add_row("Types:", types_str)
sections.append(Panel(enhanced_table, title="Enhanced Analysis", border_style="green"))
# Timing analysis
timing_table = Table(show_header=False, box=None, padding=0)
timing_table.add_column(style="dim", width=12)
timing_table.add_column()
timing_table.add_row("Duration:", f"{flow.duration:.2f}s")
timing_table.add_row("Avg Interval:", f"{flow.avg_inter_arrival * 1000:.1f}ms")
timing_table.add_row("Jitter:", f"{flow.jitter * 1000:.2f}ms")
timing_table.add_row("First Seen:", self._format_timestamp(flow.first_seen))
timing_table.add_row("Last Seen:", self._format_timestamp(flow.last_seen))
sections.append(Panel(timing_table, title="Timing Analysis", border_style="cyan"))
# Frame type breakdown (if multiple types)
if len(flow.frame_types) > 1:
frame_table = Table(show_header=True, box=None)
frame_table.add_column("Type", style="blue")
frame_table.add_column("Count", justify="right")
frame_table.add_column("%", justify="right")
total = flow.frame_count
for frame_type, stats in sorted(
flow.frame_types.items(),
key=lambda x: x[1].count,
reverse=True
)[:5]:
percentage = (stats.count / total * 100) if total > 0 else 0
frame_table.add_row(
frame_type[:15],
f"{stats.count:,}",
f"{percentage:.1f}%"
)
sections.append(Panel(frame_table, title="Frame Types", border_style="yellow"))
# Quality metrics
if flow.outlier_frames or flow.enhanced_analysis.decoder_type != "Standard":
quality_lines = []
if flow.outlier_frames:
outlier_pct = len(flow.outlier_frames) / flow.frame_count * 100
quality_lines.append(f"Outliers: {len(flow.outlier_frames)} ({outlier_pct:.1f}%)")
if flow.enhanced_analysis.timing_accuracy:
quality_lines.append(f"Timing: {flow.enhanced_analysis.timing_accuracy}")
if flow.enhanced_analysis.signal_quality:
quality_lines.append(f"Signal: {flow.enhanced_analysis.signal_quality:.1f}%")
if quality_lines:
quality_text = "\n".join(quality_lines)
sections.append(Panel(quality_text, title="Quality Metrics", border_style="magenta"))
return Group(*sections)
def _format_bytes(self, bytes_count: int) -> str:
"""Format byte count with units"""
if bytes_count >= 1_000_000_000:
return f"{bytes_count / 1_000_000_000:.2f} GB"
elif bytes_count >= 1_000_000:
return f"{bytes_count / 1_000_000:.2f} MB"
elif bytes_count >= 1_000:
return f"{bytes_count / 1_000:.2f} KB"
else:
return f"{bytes_count} B"
def _format_timestamp(self, timestamp: float) -> str:
"""Format timestamp for display"""
import datetime
dt = datetime.datetime.fromtimestamp(timestamp)
return dt.strftime("%H:%M:%S.%f")[:-3] # Show milliseconds