Files
StreamLens/analyzer/main.py

452 lines
19 KiB
Python
Raw Permalink Normal View History

2025-07-25 15:52:16 -04:00
"""
Main entry point for the Ethernet Traffic Analyzer
"""
import sys
import time
import threading
import argparse
import curses
from .analysis import EthernetAnalyzer
from .tui import TUIInterface
from .tui.modern_interface import ModernTUIInterface
from .tui.textual.app_v2 import StreamLensAppV2
2025-07-25 15:52:16 -04:00
from .utils import PCAPLoader, LiveCapture
2025-07-30 23:48:32 -04:00
from .reporting import FlowReportGenerator
2025-07-25 15:52:16 -04:00
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Ethernet Traffic Analyzer')
parser.add_argument('--pcap', '-p', help='PCAP file to analyze')
parser.add_argument('--live', '-l', action='store_true', help='Start live capture')
parser.add_argument('--interface', '-i', help='Network interface for live capture')
parser.add_argument('--filter', '-f', help='BPF filter for live capture')
parser.add_argument('--no-tui', action='store_true', help='Disable TUI, print to console')
parser.add_argument('--info', action='store_true', help='Show PCAP file information only')
parser.add_argument('--outlier-threshold', type=float, default=3.0,
help='Outlier detection threshold in standard deviations (default: 3.0)')
parser.add_argument('--report', action='store_true',
help='Generate comprehensive outlier report and exit (no TUI)')
2025-07-30 23:48:32 -04:00
parser.add_argument('--flow-report', metavar='OUTPUT_FILE',
help='Generate comprehensive flow analysis report to specified file and exit')
parser.add_argument('--report-format', choices=['markdown', 'html', 'text'], default='markdown',
help='Report output format (default: markdown)')
2025-07-25 21:45:07 -04:00
parser.add_argument('--gui', action='store_true',
help='Launch GUI mode (requires PySide6)')
parser.add_argument('--classic', action='store_true',
help='Use classic TUI interface')
parser.add_argument('--textual', action='store_true',
help='Use modern Textual TUI interface (experimental)')
2025-07-25 15:52:16 -04:00
args = parser.parse_args()
2025-07-25 21:45:07 -04:00
# Handle GUI mode
if args.gui:
launch_gui(args)
return
2025-07-25 15:52:16 -04:00
if not args.pcap and not args.live:
print("Error: Must specify either --pcap file or --live capture")
sys.exit(1)
# Create analyzer - enable real-time stats for live capture
enable_realtime = args.live
analyzer = EthernetAnalyzer(enable_realtime=enable_realtime, outlier_threshold_sigma=args.outlier_threshold)
# Handle PCAP info mode
if args.info and args.pcap:
print("Analyzing PCAP file information...")
info = PCAPLoader.get_file_info(args.pcap)
if 'error' in info:
print(f"Error: {info['error']}")
sys.exit(1)
print(f"\n=== PCAP FILE INFORMATION ===")
print(f"File: {info['file_path']}")
print(f"Packets: {info['packet_count']:,}")
print(f"Total bytes: {info['total_bytes']:,}")
print(f"Duration: {info['duration_seconds']:.2f} seconds")
print(f"Average packet rate: {info['avg_packet_rate']:.1f} packets/sec")
if info['first_timestamp']:
import datetime
first_time = datetime.datetime.fromtimestamp(info['first_timestamp'])
last_time = datetime.datetime.fromtimestamp(info['last_timestamp'])
print(f"First packet: {first_time}")
print(f"Last packet: {last_time}")
return
2025-07-28 18:28:26 -04:00
# Load PCAP file (skip for textual mode which uses background parsing)
if args.pcap and not args.textual:
2025-07-25 15:52:16 -04:00
try:
loader = PCAPLoader(args.pcap)
if not loader.validate_file():
print(f"Error: Invalid or inaccessible PCAP file: {args.pcap}")
sys.exit(1)
packets = loader.load_all()
analyzer.all_packets = packets
print(f"Loaded {len(packets)} packets")
# Process packets
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
analyzer.calculate_statistics()
except Exception as e:
print(f"Error loading PCAP file: {e}")
sys.exit(1)
2025-07-28 18:28:26 -04:00
elif args.pcap and args.textual:
# For textual mode, just validate the file exists
try:
loader = PCAPLoader(args.pcap)
if not loader.validate_file():
print(f"Error: Invalid or inaccessible PCAP file: {args.pcap}")
sys.exit(1)
except Exception as e:
print(f"Error validating PCAP file: {e}")
sys.exit(1)
2025-07-25 15:52:16 -04:00
# Handle console output mode
if args.no_tui:
print_console_results(analyzer)
return
# Handle report mode
if args.report:
generate_outlier_report(analyzer, args.outlier_threshold)
return
2025-07-30 23:48:32 -04:00
# Handle flow report mode
if args.flow_report:
generate_flow_report(analyzer, args.flow_report, args.report_format)
return
# TUI mode - choose between classic, modern curses, and textual interface
if args.textual:
2025-07-28 18:28:26 -04:00
# Use new Textual-based interface (TipTop-inspired) with background parsing
app = StreamLensAppV2(analyzer)
2025-07-28 18:28:26 -04:00
try:
# Start background parsing if PCAP file provided
if args.pcap:
app.start_background_parsing(args.pcap)
app.run()
except KeyboardInterrupt:
print("\nShutting down...")
finally:
# Cleanup background threads
try:
app.cleanup()
except Exception as e:
print(f"Cleanup error: {e}")
pass
return
elif args.classic:
tui = TUIInterface(analyzer)
else:
tui = ModernTUIInterface(analyzer)
2025-07-25 15:52:16 -04:00
if args.live:
# Start live capture
capture = LiveCapture(args.interface, args.filter)
def packet_handler(packet, frame_num):
analyzer.all_packets.append(packet)
analyzer._process_single_packet(packet, frame_num)
capture.add_packet_handler(packet_handler)
try:
capture.start_capture(threaded=True)
analyzer.is_live = True
print("Starting live capture with real-time statistics enabled...")
print("TUI will update every 0.5 seconds with running averages and outlier detection")
# Give capture a moment to start
time.sleep(1)
2025-07-25 21:45:07 -04:00
# Run TUI with error handling
try:
curses.wrapper(tui.run)
except curses.error as e:
print(f"\nTUI error: {e}")
print("Falling back to console mode...")
print_console_results(analyzer)
2025-07-25 15:52:16 -04:00
except KeyboardInterrupt:
print("\nCapture interrupted by user")
finally:
capture.stop_capture()
else:
# PCAP analysis mode
try:
2025-07-25 21:45:07 -04:00
try:
curses.wrapper(tui.run)
except curses.error as e:
print(f"\nTUI error: {e}")
print("Terminal doesn't support curses. Falling back to console mode...")
print_console_results(analyzer)
2025-07-25 15:52:16 -04:00
except KeyboardInterrupt:
print("\nAnalysis interrupted by user")
def print_console_results(analyzer: EthernetAnalyzer):
"""Print analysis results to console"""
summary = analyzer.get_summary()
print(f"\n=== ETHERNET TRAFFIC ANALYSIS RESULTS ===")
print(f"Total Packets: {summary['total_packets']}")
print(f"Unique IP Flows: {summary['unique_flows']}")
print(f"Unique IP Addresses: {summary['unique_ips']}")
# Show summary statistics
stats = analyzer.get_summary_statistics()
if stats:
print(f"\n=== SUMMARY STATISTICS ===")
print(f"Overall Avg Inter-arrival: {stats.get('overall_avg_inter_arrival', 0):.6f}s")
print(f"Overall Std Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s")
print(f"Total Outliers: {stats.get('total_outliers', 0)}")
print(f"Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%")
# Show real-time statistics if enabled
if analyzer.statistics_engine.enable_realtime:
rt_stats = analyzer.statistics_engine.get_realtime_summary()
if rt_stats:
print(f"\n=== REAL-TIME STATISTICS ===")
print(f"Real-time Mode: {rt_stats.get('realtime_enabled', False)}")
print(f"Tracked Flows: {rt_stats.get('tracked_flows', 0)}")
print(f"Update Frequency: {rt_stats.get('update_frequency', 'N/A')}")
print(f"\n=== FLOW STATISTICS ===")
2025-07-25 19:42:33 -04:00
flows_sorted = sorted(summary['flows'].values(), key=lambda x: (
analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
2025-07-25 15:52:16 -04:00
for flow in flows_sorted:
2025-07-25 19:42:33 -04:00
max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow)
2025-07-25 15:52:16 -04:00
print(f"\nFlow: {flow.src_ip} -> {flow.dst_ip}")
print(f" Packets: {flow.frame_count}")
print(f" Total Bytes: {flow.total_bytes:,}")
2025-07-25 19:42:33 -04:00
print(f" Max Sigma Deviation: {max_sigma:.2f}σ")
2025-07-25 15:52:16 -04:00
print(f" Protocols: {', '.join(flow.protocols)}")
if flow.detected_protocol_types:
print(f" Enhanced Protocols: {', '.join(flow.detected_protocol_types)}")
if flow.avg_inter_arrival > 0:
print(f" Avg Inter-arrival: {flow.avg_inter_arrival:.6f}s")
print(f" Std Deviation: {flow.std_inter_arrival:.6f}s")
if flow.outlier_frames:
print(f" Outlier Frames: {flow.outlier_frames}")
# Show frame type breakdown
if flow.frame_types:
print(f" Frame Types:")
for frame_type, ft_stats in flow.frame_types.items():
avg_str = f"{ft_stats.avg_inter_arrival:.3f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
print(f" {frame_type}: {ft_stats.count} packets, avg {avg_str}")
# Show high jitter flows
high_jitter = analyzer.get_high_jitter_flows()
if high_jitter:
print(f"\n=== HIGH JITTER FLOWS ===")
for flow in high_jitter[:5]: # Show top 5
cv = flow.std_inter_arrival / flow.avg_inter_arrival if flow.avg_inter_arrival > 0 else 0
print(f"{flow.src_ip} -> {flow.dst_ip}: CV = {cv:.3f}")
2025-07-30 23:48:32 -04:00
def generate_flow_report(analyzer: EthernetAnalyzer, output_file: str, format_type: str):
"""Generate comprehensive flow analysis report"""
print(f"Generating {format_type} flow analysis report...")
try:
# Create report generator
report_generator = FlowReportGenerator(analyzer)
# Generate report
report_content = report_generator.generate_report(output_file, format_type)
print(f"✅ Flow analysis report generated successfully!")
print(f"📄 Output file: {output_file}")
print(f"📊 Format: {format_type}")
print(f"📈 Flows analyzed: {len(analyzer.flows)}")
# Show preview of report length
lines = report_content.count('\n')
print(f"📝 Report length: {lines} lines")
except Exception as e:
print(f"❌ Error generating flow report: {e}")
sys.exit(1)
2025-07-25 15:52:16 -04:00
def generate_outlier_report(analyzer: EthernetAnalyzer, threshold_sigma: float):
"""Generate comprehensive outlier report without TUI"""
summary = analyzer.get_summary()
print("=" * 80)
print("COMPREHENSIVE OUTLIER ANALYSIS REPORT")
print("=" * 80)
# Analysis parameters
print(f"Outlier Detection Threshold: {threshold_sigma}σ (sigma)")
print(f"Total Packets Analyzed: {summary['total_packets']:,}")
print(f"Unique IP Flows: {summary['unique_flows']}")
print(f"Unique IP Addresses: {summary['unique_ips']}")
# Overall statistics
stats = analyzer.get_summary_statistics()
if stats:
print(f"\nOVERALL TIMING STATISTICS:")
print(f" Average Inter-arrival Time: {stats.get('overall_avg_inter_arrival', 0):.6f}s")
print(f" Standard Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s")
print(f" Total Outlier Frames: {stats.get('total_outliers', 0)}")
print(f" Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%")
print("\n" + "=" * 80)
print("DETAILED FLOW ANALYSIS")
print("=" * 80)
2025-07-25 19:42:33 -04:00
flows_sorted = sorted(summary['flows'].values(), key=lambda x: (
analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
2025-07-25 15:52:16 -04:00
for flow_idx, flow in enumerate(flows_sorted, 1):
2025-07-25 19:42:33 -04:00
max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow)
2025-07-25 15:52:16 -04:00
print(f"\n[FLOW {flow_idx}] {flow.src_ip} -> {flow.dst_ip}")
print("-" * 60)
# Flow summary
print(f"Total Packets: {flow.frame_count:,}")
print(f"Total Bytes: {flow.total_bytes:,}")
2025-07-25 19:42:33 -04:00
print(f"Max Sigma Deviation: {max_sigma:.2f}σ")
2025-07-25 15:52:16 -04:00
print(f"Protocols: {', '.join(flow.protocols)}")
if flow.detected_protocol_types:
print(f"Enhanced Protocols: {', '.join(flow.detected_protocol_types)}")
# Flow timing statistics
if flow.avg_inter_arrival > 0:
print(f"Flow Timing:")
print(f" Average Inter-arrival: {flow.avg_inter_arrival:.6f}s")
print(f" Standard Deviation: {flow.std_inter_arrival:.6f}s")
print(f" Outlier Threshold: {flow.avg_inter_arrival + (threshold_sigma * flow.std_inter_arrival):.6f}s")
print(f" Flow-level Outliers: {len(flow.outlier_details)}")
# Frame type analysis
if flow.frame_types:
print(f"\nFrame Type Breakdown:")
print(f" {'Type':<15} {'Count':<8} {'Avg ΔT':<12} {'Std Dev':<12} {'Out':<6} {'Out %':<8}")
print(f" {'-' * 15} {'-' * 8} {'-' * 12} {'-' * 12} {'-' * 6} {'-' * 8}")
sorted_frame_types = sorted(flow.frame_types.items(),
key=lambda x: x[1].count, reverse=True)
for frame_type, ft_stats in sorted_frame_types:
outlier_count = len(ft_stats.outlier_details)
outlier_pct = (outlier_count / ft_stats.count * 100) if ft_stats.count > 0 else 0
avg_str = f"{ft_stats.avg_inter_arrival:.6f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
std_str = f"{ft_stats.std_inter_arrival:.6f}s" if ft_stats.std_inter_arrival > 0 else "N/A"
print(f" {frame_type:<15} {ft_stats.count:<8} {avg_str:<12} {std_str:<12} {outlier_count:<6} {outlier_pct:<7.1f}%")
# Detailed outlier frames
has_outliers = any(ft_stats.outlier_details for ft_stats in flow.frame_types.values())
if has_outliers:
print(f"\nOutlier Frame Details:")
for frame_type, ft_stats in flow.frame_types.items():
if ft_stats.outlier_details:
print(f"\n {frame_type} Outliers ({len(ft_stats.outlier_details)} frames):")
if ft_stats.avg_inter_arrival > 0:
threshold = ft_stats.avg_inter_arrival + (threshold_sigma * ft_stats.std_inter_arrival)
print(f" Threshold: {threshold:.6f}s (>{threshold_sigma}σ from mean {ft_stats.avg_inter_arrival:.6f}s)")
2025-07-30 23:48:32 -04:00
# Use enhanced outlier details if available
if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details:
print(f" {'Frame#':<10} {'From Frame':<10} {'Inter-arrival':<15} {'Deviation':<12}")
print(f" {'-' * 10} {'-' * 10} {'-' * 15} {'-' * 12}")
for frame_num, prev_frame_num, inter_arrival_time in ft_stats.enhanced_outlier_details:
if ft_stats.avg_inter_arrival > 0:
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
dev_str = f"+{sigma_dev:.1f}σ"
else:
dev_str = "N/A"
print(f" {frame_num:<10} {prev_frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
else:
# Fallback to legacy outlier details
print(f" {'Frame#':<10} {'Inter-arrival':<15} {'Deviation':<12}")
print(f" {'-' * 10} {'-' * 15} {'-' * 12}")
2025-07-25 15:52:16 -04:00
2025-07-30 23:48:32 -04:00
for frame_num, inter_arrival_time in ft_stats.outlier_details:
if ft_stats.avg_inter_arrival > 0:
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
dev_str = f"+{sigma_dev:.1f}σ"
else:
dev_str = "N/A"
print(f" {frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
2025-07-25 15:52:16 -04:00
# High jitter flows summary
high_jitter = analyzer.get_high_jitter_flows()
if high_jitter:
print(f"\n" + "=" * 80)
print("HIGH JITTER FLOWS (Coefficient of Variation > 0.1)")
print("=" * 80)
for flow in high_jitter[:10]: # Show top 10
cv = flow.std_inter_arrival / flow.avg_inter_arrival if flow.avg_inter_arrival > 0 else 0
print(f"{flow.src_ip} -> {flow.dst_ip}: CV = {cv:.3f}")
print(f"\n" + "=" * 80)
print("REPORT COMPLETE")
print("=" * 80)
2025-07-25 21:45:07 -04:00
def launch_gui(args):
"""Launch GUI mode"""
try:
from .gui import GUI_AVAILABLE, StreamLensMainWindow
from PySide6.QtWidgets import QApplication
if not GUI_AVAILABLE:
print("Error: PySide6 not available. Please install with: pip install PySide6")
sys.exit(1)
# Create QApplication
app = QApplication(sys.argv)
app.setApplicationName("StreamLens")
app.setApplicationDisplayName("StreamLens - Ethernet Traffic Analyzer")
# Create main window
window = StreamLensMainWindow()
window.show()
# If a PCAP file was specified, load it
if args.pcap:
window.load_pcap_file(args.pcap)
# Start event loop
sys.exit(app.exec())
except ImportError as e:
print(f"Error: GUI dependencies not available: {e}")
print("Please install PySide6: pip install PySide6")
sys.exit(1)
except Exception as e:
print(f"Error launching GUI: {e}")
sys.exit(1)
2025-07-25 15:52:16 -04:00
if __name__ == "__main__":
main()