Files
StreamLens/analyzer/gui/main_window.py
2025-07-26 00:02:25 -04:00

586 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Main GUI window for StreamLens
"""
import sys
import os
from typing import Optional, List, TYPE_CHECKING, Dict
try:
from PySide6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QSplitter,
QTableWidget, QTableWidgetItem, QTextEdit, QMenuBar, QMenu,
QFileDialog, QMessageBox, QProgressBar, QStatusBar, QLabel,
QHeaderView, QPushButton, QGroupBox, QDockWidget, QToolBar,
QButtonGroup, QComboBox, QSpinBox
)
from PySide6.QtCore import Qt, QThread, Signal, QTimer
from PySide6.QtGui import QAction, QIcon, QFont, QKeySequence, QActionGroup
from .dock_panels import FlowListDockWidget, FlowDetailDockWidget
# Matplotlib integration - lazy loaded
matplotlib = None
FigureCanvas = None
NavigationToolbar = None
Figure = None
plt = None
def _ensure_matplotlib_gui_loaded():
"""Lazy load matplotlib for GUI mode"""
global matplotlib, FigureCanvas, NavigationToolbar, Figure, plt
if matplotlib is not None:
return True
try:
import matplotlib as mpl
matplotlib = mpl
matplotlib.use('Qt5Agg') # Use Qt backend for matplotlib
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FC
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NT
from matplotlib.figure import Figure as Fig
import matplotlib.pyplot as pyplot
# Turn off interactive mode to prevent floating windows
pyplot.ioff()
# Ensure no figure windows are created
mpl.rcParams['figure.max_open_warning'] = 0
mpl.rcParams['backend'] = 'Qt5Agg'
# Prevent any automatic figure display
mpl.pyplot.show = lambda *args, **kwargs: None
pyplot.show = lambda *args, **kwargs: None
# Also prevent figure.show()
original_figure_show = Fig.show
Fig.show = lambda self, *args, **kwargs: None
FigureCanvas = FC
NavigationToolbar = NT
Figure = Fig
plt = pyplot
return True
except ImportError as e:
print(f"Matplotlib GUI integration not available: {e}")
return False
except ImportError as e:
print(f"GUI dependencies not available: {e}")
sys.exit(1)
if TYPE_CHECKING:
from ..analysis.core import EthernetAnalyzer
from ..models.flow_stats import FlowStats
class PCAPLoadThread(QThread):
"""Thread for loading PCAP files without blocking UI"""
progress_updated = Signal(int)
loading_finished = Signal(object) # Emits the analyzer
error_occurred = Signal(str)
def __init__(self, file_path: str, parent=None):
super().__init__(parent)
self.file_path = file_path
def run(self):
try:
from ..analysis.core import EthernetAnalyzer
from ..utils.pcap_loader import PCAPLoader
# Create analyzer
analyzer = EthernetAnalyzer()
# Load PCAP
loader = PCAPLoader(self.file_path)
if not loader.validate_file():
self.error_occurred.emit(f"Invalid PCAP file: {self.file_path}")
return
packets = loader.load_all()
analyzer.all_packets = packets
# Process packets with progress updates
total_packets = len(packets)
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
# Update progress every 1000 packets
if i % 1000 == 0 or i == total_packets:
progress = int((i / total_packets) * 100)
self.progress_updated.emit(progress)
# Calculate statistics
analyzer.calculate_statistics()
self.loading_finished.emit(analyzer)
except Exception as e:
self.error_occurred.emit(str(e))
class StreamLensMainWindow(QMainWindow):
"""Main application window with docking panels"""
def __init__(self):
super().__init__()
self.analyzer: Optional['EthernetAnalyzer'] = None
self.current_file = None
self.loading_thread = None
self._initializing = True # Flag to prevent early event triggers
self.setWindowTitle("StreamLens - Ethernet Traffic Analyzer")
self.setGeometry(100, 100, 1600, 1000) # Larger default size for docking
# Apply modern dark theme
self.apply_dark_theme()
self.setup_menus()
self.setup_toolbar()
self.setup_docking_ui()
self.setup_status_bar()
# Initialization complete
self._initializing = False
def apply_dark_theme(self):
"""Apply modern dark theme to the entire application"""
dark_stylesheet = """
/* Main Window Dark Theme */
QMainWindow {
background-color: #1e1e1e;
color: #ffffff;
}
/* Menu Bar */
QMenuBar {
background-color: #2d2d2d;
color: #ffffff;
border-bottom: 1px solid #404040;
padding: 2px;
}
QMenuBar::item {
background-color: transparent;
padding: 8px 12px;
border-radius: 4px;
}
QMenuBar::item:selected {
background-color: #404040;
}
QMenuBar::item:pressed {
background-color: #505050;
}
/* Menu Dropdown */
QMenu {
background-color: #2d2d2d;
color: #ffffff;
border: 1px solid #404040;
border-radius: 6px;
padding: 4px;
}
QMenu::item {
padding: 8px 20px;
border-radius: 4px;
}
QMenu::item:selected {
background-color: #404040;
}
/* Toolbar */
QToolBar {
background-color: #2d2d2d;
border: none;
padding: 4px;
spacing: 8px;
}
QToolButton {
background-color: #404040;
color: #ffffff;
border: 1px solid #505050;
border-radius: 6px;
padding: 8px 12px;
font-weight: 500;
}
QToolButton:hover {
background-color: #505050;
border-color: #606060;
}
QToolButton:pressed {
background-color: #353535;
}
/* Status Bar */
QStatusBar {
background-color: #2d2d2d;
color: #cccccc;
border-top: 1px solid #404040;
padding: 4px;
}
/* Progress Bar */
QProgressBar {
background-color: #404040;
border: 1px solid #505050;
border-radius: 4px;
text-align: center;
color: #ffffff;
}
QProgressBar::chunk {
background-color: #0078d4;
border-radius: 3px;
}
/* Spin Box */
QSpinBox {
background-color: #404040;
color: #ffffff;
border: 1px solid #505050;
border-radius: 4px;
padding: 4px 8px;
min-width: 50px;
}
QSpinBox:hover {
border-color: #606060;
}
QSpinBox:focus {
border-color: #0078d4;
}
QSpinBox::up-button, QSpinBox::down-button {
background-color: #505050;
border: none;
width: 16px;
}
QSpinBox::up-button:hover, QSpinBox::down-button:hover {
background-color: #606060;
}
/* Labels */
QLabel {
color: #ffffff;
}
/* Dock Widgets */
QDockWidget {
background-color: #1e1e1e;
color: #ffffff;
titlebar-close-icon: none;
titlebar-normal-icon: none;
}
QDockWidget::title {
background-color: #2d2d2d;
color: #ffffff;
padding: 8px;
border-bottom: 1px solid #404040;
font-weight: bold;
}
/* Splitter */
QSplitter::handle {
background-color: #404040;
}
QSplitter::handle:horizontal {
width: 2px;
}
QSplitter::handle:vertical {
height: 2px;
}
"""
self.setStyleSheet(dark_stylesheet)
def setup_docking_ui(self):
"""Set up the UI layout with integrated plots"""
# Create docking panels
self.flow_list_dock = FlowListDockWidget(self)
self.flow_detail_dock = FlowDetailDockWidget(self)
# Set flow list as central widget to take full width
self.setCentralWidget(self.flow_list_dock)
# Add detail dock at bottom
self.addDockWidget(Qt.BottomDockWidgetArea, self.flow_detail_dock)
# Connect signals
self.flow_list_dock.flows_table.itemSelectionChanged.connect(self.on_flow_selection_changed)
# Set detail dock height
self.resizeDocks([self.flow_detail_dock], [150], Qt.Vertical)
def on_flow_selection_changed(self):
"""Handle flow selection change to update detail panel"""
if not hasattr(self, 'flow_list_dock') or not self.analyzer:
return
selected_rows = self.flow_list_dock.flows_table.selectionModel().selectedRows()
if not selected_rows:
self.flow_detail_dock.update_flow_details(None, None)
return
# Get selected flow
row = selected_rows[0].row()
flow_item = self.flow_list_dock.flows_table.item(row, 0)
flow = flow_item.data(Qt.UserRole)
if flow:
self.flow_detail_dock.update_flow_details(flow, self.analyzer)
def setup_menus(self):
"""Set up application menus"""
menubar = self.menuBar()
# File menu
file_menu = menubar.addMenu("&File")
# Open PCAP
open_action = QAction("&Open PCAP...", self)
open_action.setShortcut(QKeySequence.Open)
open_action.setStatusTip("Open a PCAP file for analysis")
open_action.triggered.connect(self.open_pcap_file)
file_menu.addAction(open_action)
# Monitor NIC
monitor_action = QAction("&Monitor NIC...", self)
monitor_action.setShortcut("Ctrl+M")
monitor_action.setStatusTip("Start live network monitoring")
monitor_action.triggered.connect(self.monitor_nic)
file_menu.addAction(monitor_action)
file_menu.addSeparator()
# Exit
exit_action = QAction("E&xit", self)
exit_action.setShortcut(QKeySequence.Quit)
exit_action.setStatusTip("Exit StreamLens")
exit_action.triggered.connect(self.close)
file_menu.addAction(exit_action)
# View menu
view_menu = menubar.addMenu("&View")
# Refresh
refresh_action = QAction("&Refresh", self)
refresh_action.setShortcut(QKeySequence.Refresh)
refresh_action.setStatusTip("Refresh current data")
refresh_action.triggered.connect(self.refresh_data)
view_menu.addAction(refresh_action)
view_menu.addSeparator()
# Show/Hide panels
show_flow_details_action = QAction("Show Flow &Details", self)
show_flow_details_action.setCheckable(True)
show_flow_details_action.setChecked(True)
show_flow_details_action.triggered.connect(lambda checked: self.flow_detail_dock.setVisible(checked))
view_menu.addAction(show_flow_details_action)
# Help menu
help_menu = menubar.addMenu("&Help")
about_action = QAction("&About StreamLens...", self)
about_action.setStatusTip("About StreamLens")
about_action.triggered.connect(self.show_about)
help_menu.addAction(about_action)
def setup_toolbar(self):
"""Set up application toolbar"""
toolbar = QToolBar("Main Toolbar")
toolbar.setMovable(False)
self.addToolBar(toolbar)
# File operations
open_action = QAction("Open", self)
open_action.setShortcut(QKeySequence.Open)
open_action.setStatusTip("Open PCAP file")
open_action.triggered.connect(self.open_pcap_file)
toolbar.addAction(open_action)
monitor_action = QAction("Monitor", self)
monitor_action.setStatusTip("Monitor network interface")
monitor_action.triggered.connect(self.monitor_nic)
toolbar.addAction(monitor_action)
toolbar.addSeparator()
# Outlier threshold control
toolbar.addWidget(QLabel("Outlier Threshold: "))
self.outlier_threshold_spin = QSpinBox()
self.outlier_threshold_spin.setRange(1, 10)
self.outlier_threshold_spin.setValue(3)
self.outlier_threshold_spin.setSuffix("σ")
self.outlier_threshold_spin.setStatusTip("Set sigma threshold for outlier detection")
self.outlier_threshold_spin.valueChanged.connect(self.on_outlier_threshold_changed)
toolbar.addWidget(self.outlier_threshold_spin)
def setup_status_bar(self):
"""Set up status bar"""
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
self.status_bar.addPermanentWidget(self.progress_bar)
self.status_bar.showMessage("Ready")
def open_pcap_file(self):
"""Open PCAP file dialog"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"Open PCAP File",
"",
"PCAP Files (*.pcap *.pcapng);;All Files (*)"
)
if file_path:
self.load_pcap_file(file_path)
def load_pcap_file(self, file_path: str):
"""Load PCAP file in background thread"""
if self.loading_thread and self.loading_thread.isRunning():
return
self.current_file = file_path
self.status_bar.showMessage(f"Loading {os.path.basename(file_path)}...")
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
# Disable UI during loading
self.flow_list_dock.flows_table.setEnabled(False)
# Start loading thread
self.loading_thread = PCAPLoadThread(file_path)
self.loading_thread.progress_updated.connect(self.progress_bar.setValue)
self.loading_thread.loading_finished.connect(self.on_pcap_loaded)
self.loading_thread.error_occurred.connect(self.on_loading_error)
self.loading_thread.start()
def on_pcap_loaded(self, analyzer: 'EthernetAnalyzer'):
"""Handle PCAP loading completion"""
self.analyzer = analyzer
self.progress_bar.setVisible(False)
self.flow_list_dock.flows_table.setEnabled(True)
# Update flow list dock with analyzer and file info
self.flow_list_dock.set_analyzer(analyzer)
self.flow_list_dock.update_file_info(self.current_file)
# Populate flows table with embedded plots (synchronous)
self.flow_list_dock.populate_flows_table()
summary = analyzer.get_summary()
self.status_bar.showMessage(f"Loaded {summary['total_packets']:,} packets with embedded plots")
def on_loading_error(self, error_message: str):
"""Handle loading error"""
self.progress_bar.setVisible(False)
self.flow_list_dock.flows_table.setEnabled(True)
QMessageBox.critical(self, "Loading Error", f"Error loading PCAP file:\n{error_message}")
self.status_bar.showMessage("Error loading file")
def get_flow_packets(self, flow: 'FlowStats') -> List:
"""Get all packets for a specific flow"""
if not self.analyzer or not self.analyzer.all_packets:
return []
flow_packets = []
for packet in self.analyzer.all_packets:
try:
if hasattr(packet, 'haslayer'):
from scapy.all import IP
if packet.haslayer(IP):
ip_layer = packet[IP]
if ip_layer.src == flow.src_ip and ip_layer.dst == flow.dst_ip:
flow_packets.append(packet)
except:
continue
return flow_packets
def refresh_data(self):
"""Refresh the current data"""
if self.current_file:
self.load_pcap_file(self.current_file)
def closeEvent(self, event):
"""Handle application close"""
if self.loading_thread and self.loading_thread.isRunning():
self.loading_thread.quit()
self.loading_thread.wait()
event.accept()
# Menu and toolbar action handlers
def monitor_nic(self):
"""Start network interface monitoring"""
# TODO: Implement NIC monitoring dialog
QMessageBox.information(
self,
"Monitor NIC",
"Network interface monitoring will be implemented in a future version."
)
def show_about(self):
"""Show about dialog"""
QMessageBox.about(
self,
"About StreamLens",
"""<h2>StreamLens v1.0</h2>
<p>Advanced Ethernet Traffic Analyzer</p>
<p>Specialized protocol dissection for aviation and industrial networks
with sigma-based outlier identification and interactive signal visualization.</p>
<p><b>Features:</b></p>
<ul>
<li>Chapter 10 (IRIG 106) telemetry protocol support</li>
<li>PTP (IEEE 1588) and IENA protocol dissection</li>
<li>Statistical outlier detection</li>
<li>Interactive signal visualization</li>
<li>Docking panel interface</li>
</ul>
<p>Built with PySide6 and matplotlib</p>"""
)
def on_outlier_threshold_changed(self, value: int):
"""Handle outlier threshold change"""
if self.analyzer:
# Update analyzer's outlier threshold
self.analyzer.statistics_engine.sigma_threshold = float(value)
# Refresh the flow data with new threshold
if hasattr(self, 'flow_list_dock'):
self.flow_list_dock.populate_flows_table()
self.status_bar.showMessage(f"Outlier threshold updated to {value}σ")