Files
StreamLens/analyzer/main.py

452 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Main entry point for the Ethernet Traffic Analyzer
"""
import sys
import time
import threading
import argparse
import curses
from .analysis import EthernetAnalyzer
from .tui import TUIInterface
from .tui.modern_interface import ModernTUIInterface
from .tui.textual.app_v2 import StreamLensAppV2
from .utils import PCAPLoader, LiveCapture
from .reporting import FlowReportGenerator
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Ethernet Traffic Analyzer')
parser.add_argument('--pcap', '-p', help='PCAP file to analyze')
parser.add_argument('--live', '-l', action='store_true', help='Start live capture')
parser.add_argument('--interface', '-i', help='Network interface for live capture')
parser.add_argument('--filter', '-f', help='BPF filter for live capture')
parser.add_argument('--no-tui', action='store_true', help='Disable TUI, print to console')
parser.add_argument('--info', action='store_true', help='Show PCAP file information only')
parser.add_argument('--outlier-threshold', type=float, default=3.0,
help='Outlier detection threshold in standard deviations (default: 3.0)')
parser.add_argument('--report', action='store_true',
help='Generate comprehensive outlier report and exit (no TUI)')
parser.add_argument('--flow-report', metavar='OUTPUT_FILE',
help='Generate comprehensive flow analysis report to specified file and exit')
parser.add_argument('--report-format', choices=['markdown', 'html', 'text'], default='markdown',
help='Report output format (default: markdown)')
parser.add_argument('--gui', action='store_true',
help='Launch GUI mode (requires PySide6)')
parser.add_argument('--classic', action='store_true',
help='Use classic TUI interface')
parser.add_argument('--textual', action='store_true',
help='Use modern Textual TUI interface (experimental)')
args = parser.parse_args()
# Handle GUI mode
if args.gui:
launch_gui(args)
return
if not args.pcap and not args.live:
print("Error: Must specify either --pcap file or --live capture")
sys.exit(1)
# Create analyzer - enable real-time stats for live capture
enable_realtime = args.live
analyzer = EthernetAnalyzer(enable_realtime=enable_realtime, outlier_threshold_sigma=args.outlier_threshold)
# Handle PCAP info mode
if args.info and args.pcap:
print("Analyzing PCAP file information...")
info = PCAPLoader.get_file_info(args.pcap)
if 'error' in info:
print(f"Error: {info['error']}")
sys.exit(1)
print(f"\n=== PCAP FILE INFORMATION ===")
print(f"File: {info['file_path']}")
print(f"Packets: {info['packet_count']:,}")
print(f"Total bytes: {info['total_bytes']:,}")
print(f"Duration: {info['duration_seconds']:.2f} seconds")
print(f"Average packet rate: {info['avg_packet_rate']:.1f} packets/sec")
if info['first_timestamp']:
import datetime
first_time = datetime.datetime.fromtimestamp(info['first_timestamp'])
last_time = datetime.datetime.fromtimestamp(info['last_timestamp'])
print(f"First packet: {first_time}")
print(f"Last packet: {last_time}")
return
# Load PCAP file (skip for textual mode which uses background parsing)
if args.pcap and not args.textual:
try:
loader = PCAPLoader(args.pcap)
if not loader.validate_file():
print(f"Error: Invalid or inaccessible PCAP file: {args.pcap}")
sys.exit(1)
packets = loader.load_all()
analyzer.all_packets = packets
print(f"Loaded {len(packets)} packets")
# Process packets
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
analyzer.calculate_statistics()
except Exception as e:
print(f"Error loading PCAP file: {e}")
sys.exit(1)
elif args.pcap and args.textual:
# For textual mode, just validate the file exists
try:
loader = PCAPLoader(args.pcap)
if not loader.validate_file():
print(f"Error: Invalid or inaccessible PCAP file: {args.pcap}")
sys.exit(1)
except Exception as e:
print(f"Error validating PCAP file: {e}")
sys.exit(1)
# Handle console output mode
if args.no_tui:
print_console_results(analyzer)
return
# Handle report mode
if args.report:
generate_outlier_report(analyzer, args.outlier_threshold)
return
# Handle flow report mode
if args.flow_report:
generate_flow_report(analyzer, args.flow_report, args.report_format)
return
# TUI mode - choose between classic, modern curses, and textual interface
if args.textual:
# Use new Textual-based interface (TipTop-inspired) with background parsing
app = StreamLensAppV2(analyzer)
try:
# Start background parsing if PCAP file provided
if args.pcap:
app.start_background_parsing(args.pcap)
app.run()
except KeyboardInterrupt:
print("\nShutting down...")
finally:
# Cleanup background threads
try:
app.cleanup()
except Exception as e:
print(f"Cleanup error: {e}")
pass
return
elif args.classic:
tui = TUIInterface(analyzer)
else:
tui = ModernTUIInterface(analyzer)
if args.live:
# Start live capture
capture = LiveCapture(args.interface, args.filter)
def packet_handler(packet, frame_num):
analyzer.all_packets.append(packet)
analyzer._process_single_packet(packet, frame_num)
capture.add_packet_handler(packet_handler)
try:
capture.start_capture(threaded=True)
analyzer.is_live = True
print("Starting live capture with real-time statistics enabled...")
print("TUI will update every 0.5 seconds with running averages and outlier detection")
# Give capture a moment to start
time.sleep(1)
# Run TUI with error handling
try:
curses.wrapper(tui.run)
except curses.error as e:
print(f"\nTUI error: {e}")
print("Falling back to console mode...")
print_console_results(analyzer)
except KeyboardInterrupt:
print("\nCapture interrupted by user")
finally:
capture.stop_capture()
else:
# PCAP analysis mode
try:
try:
curses.wrapper(tui.run)
except curses.error as e:
print(f"\nTUI error: {e}")
print("Terminal doesn't support curses. Falling back to console mode...")
print_console_results(analyzer)
except KeyboardInterrupt:
print("\nAnalysis interrupted by user")
def print_console_results(analyzer: EthernetAnalyzer):
"""Print analysis results to console"""
summary = analyzer.get_summary()
print(f"\n=== ETHERNET TRAFFIC ANALYSIS RESULTS ===")
print(f"Total Packets: {summary['total_packets']}")
print(f"Unique IP Flows: {summary['unique_flows']}")
print(f"Unique IP Addresses: {summary['unique_ips']}")
# Show summary statistics
stats = analyzer.get_summary_statistics()
if stats:
print(f"\n=== SUMMARY STATISTICS ===")
print(f"Overall Avg Inter-arrival: {stats.get('overall_avg_inter_arrival', 0):.6f}s")
print(f"Overall Std Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s")
print(f"Total Outliers: {stats.get('total_outliers', 0)}")
print(f"Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%")
# Show real-time statistics if enabled
if analyzer.statistics_engine.enable_realtime:
rt_stats = analyzer.statistics_engine.get_realtime_summary()
if rt_stats:
print(f"\n=== REAL-TIME STATISTICS ===")
print(f"Real-time Mode: {rt_stats.get('realtime_enabled', False)}")
print(f"Tracked Flows: {rt_stats.get('tracked_flows', 0)}")
print(f"Update Frequency: {rt_stats.get('update_frequency', 'N/A')}")
print(f"\n=== FLOW STATISTICS ===")
flows_sorted = sorted(summary['flows'].values(), key=lambda x: (
analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
for flow in flows_sorted:
max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow)
print(f"\nFlow: {flow.src_ip} -> {flow.dst_ip}")
print(f" Packets: {flow.frame_count}")
print(f" Total Bytes: {flow.total_bytes:,}")
print(f" Max Sigma Deviation: {max_sigma:.2f}σ")
print(f" Protocols: {', '.join(flow.protocols)}")
if flow.detected_protocol_types:
print(f" Enhanced Protocols: {', '.join(flow.detected_protocol_types)}")
if flow.avg_inter_arrival > 0:
print(f" Avg Inter-arrival: {flow.avg_inter_arrival:.6f}s")
print(f" Std Deviation: {flow.std_inter_arrival:.6f}s")
if flow.outlier_frames:
print(f" Outlier Frames: {flow.outlier_frames}")
# Show frame type breakdown
if flow.frame_types:
print(f" Frame Types:")
for frame_type, ft_stats in flow.frame_types.items():
avg_str = f"{ft_stats.avg_inter_arrival:.3f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
print(f" {frame_type}: {ft_stats.count} packets, avg {avg_str}")
# Show high jitter flows
high_jitter = analyzer.get_high_jitter_flows()
if high_jitter:
print(f"\n=== HIGH JITTER FLOWS ===")
for flow in high_jitter[:5]: # Show top 5
cv = flow.std_inter_arrival / flow.avg_inter_arrival if flow.avg_inter_arrival > 0 else 0
print(f"{flow.src_ip} -> {flow.dst_ip}: CV = {cv:.3f}")
def generate_flow_report(analyzer: EthernetAnalyzer, output_file: str, format_type: str):
"""Generate comprehensive flow analysis report"""
print(f"Generating {format_type} flow analysis report...")
try:
# Create report generator
report_generator = FlowReportGenerator(analyzer)
# Generate report
report_content = report_generator.generate_report(output_file, format_type)
print(f"✅ Flow analysis report generated successfully!")
print(f"📄 Output file: {output_file}")
print(f"📊 Format: {format_type}")
print(f"📈 Flows analyzed: {len(analyzer.flows)}")
# Show preview of report length
lines = report_content.count('\n')
print(f"📝 Report length: {lines} lines")
except Exception as e:
print(f"❌ Error generating flow report: {e}")
sys.exit(1)
def generate_outlier_report(analyzer: EthernetAnalyzer, threshold_sigma: float):
"""Generate comprehensive outlier report without TUI"""
summary = analyzer.get_summary()
print("=" * 80)
print("COMPREHENSIVE OUTLIER ANALYSIS REPORT")
print("=" * 80)
# Analysis parameters
print(f"Outlier Detection Threshold: {threshold_sigma}σ (sigma)")
print(f"Total Packets Analyzed: {summary['total_packets']:,}")
print(f"Unique IP Flows: {summary['unique_flows']}")
print(f"Unique IP Addresses: {summary['unique_ips']}")
# Overall statistics
stats = analyzer.get_summary_statistics()
if stats:
print(f"\nOVERALL TIMING STATISTICS:")
print(f" Average Inter-arrival Time: {stats.get('overall_avg_inter_arrival', 0):.6f}s")
print(f" Standard Deviation: {stats.get('overall_std_inter_arrival', 0):.6f}s")
print(f" Total Outlier Frames: {stats.get('total_outliers', 0)}")
print(f" Outlier Percentage: {stats.get('outlier_percentage', 0):.2f}%")
print("\n" + "=" * 80)
print("DETAILED FLOW ANALYSIS")
print("=" * 80)
flows_sorted = sorted(summary['flows'].values(), key=lambda x: (
analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
for flow_idx, flow in enumerate(flows_sorted, 1):
max_sigma = analyzer.statistics_engine.get_max_sigma_deviation(flow)
print(f"\n[FLOW {flow_idx}] {flow.src_ip} -> {flow.dst_ip}")
print("-" * 60)
# Flow summary
print(f"Total Packets: {flow.frame_count:,}")
print(f"Total Bytes: {flow.total_bytes:,}")
print(f"Max Sigma Deviation: {max_sigma:.2f}σ")
print(f"Protocols: {', '.join(flow.protocols)}")
if flow.detected_protocol_types:
print(f"Enhanced Protocols: {', '.join(flow.detected_protocol_types)}")
# Flow timing statistics
if flow.avg_inter_arrival > 0:
print(f"Flow Timing:")
print(f" Average Inter-arrival: {flow.avg_inter_arrival:.6f}s")
print(f" Standard Deviation: {flow.std_inter_arrival:.6f}s")
print(f" Outlier Threshold: {flow.avg_inter_arrival + (threshold_sigma * flow.std_inter_arrival):.6f}s")
print(f" Flow-level Outliers: {len(flow.outlier_details)}")
# Frame type analysis
if flow.frame_types:
print(f"\nFrame Type Breakdown:")
print(f" {'Type':<15} {'Count':<8} {'Avg ΔT':<12} {'Std Dev':<12} {'Out':<6} {'Out %':<8}")
print(f" {'-' * 15} {'-' * 8} {'-' * 12} {'-' * 12} {'-' * 6} {'-' * 8}")
sorted_frame_types = sorted(flow.frame_types.items(),
key=lambda x: x[1].count, reverse=True)
for frame_type, ft_stats in sorted_frame_types:
outlier_count = len(ft_stats.outlier_details)
outlier_pct = (outlier_count / ft_stats.count * 100) if ft_stats.count > 0 else 0
avg_str = f"{ft_stats.avg_inter_arrival:.6f}s" if ft_stats.avg_inter_arrival > 0 else "N/A"
std_str = f"{ft_stats.std_inter_arrival:.6f}s" if ft_stats.std_inter_arrival > 0 else "N/A"
print(f" {frame_type:<15} {ft_stats.count:<8} {avg_str:<12} {std_str:<12} {outlier_count:<6} {outlier_pct:<7.1f}%")
# Detailed outlier frames
has_outliers = any(ft_stats.outlier_details for ft_stats in flow.frame_types.values())
if has_outliers:
print(f"\nOutlier Frame Details:")
for frame_type, ft_stats in flow.frame_types.items():
if ft_stats.outlier_details:
print(f"\n {frame_type} Outliers ({len(ft_stats.outlier_details)} frames):")
if ft_stats.avg_inter_arrival > 0:
threshold = ft_stats.avg_inter_arrival + (threshold_sigma * ft_stats.std_inter_arrival)
print(f" Threshold: {threshold:.6f}s (>{threshold_sigma}σ from mean {ft_stats.avg_inter_arrival:.6f}s)")
# Use enhanced outlier details if available
if hasattr(ft_stats, 'enhanced_outlier_details') and ft_stats.enhanced_outlier_details:
print(f" {'Frame#':<10} {'From Frame':<10} {'Inter-arrival':<15} {'Deviation':<12}")
print(f" {'-' * 10} {'-' * 10} {'-' * 15} {'-' * 12}")
for frame_num, prev_frame_num, inter_arrival_time in ft_stats.enhanced_outlier_details:
if ft_stats.avg_inter_arrival > 0:
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
dev_str = f"+{sigma_dev:.1f}σ"
else:
dev_str = "N/A"
print(f" {frame_num:<10} {prev_frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
else:
# Fallback to legacy outlier details
print(f" {'Frame#':<10} {'Inter-arrival':<15} {'Deviation':<12}")
print(f" {'-' * 10} {'-' * 15} {'-' * 12}")
for frame_num, inter_arrival_time in ft_stats.outlier_details:
if ft_stats.avg_inter_arrival > 0:
deviation = inter_arrival_time - ft_stats.avg_inter_arrival
sigma_dev = deviation / ft_stats.std_inter_arrival if ft_stats.std_inter_arrival > 0 else 0
dev_str = f"+{sigma_dev:.1f}σ"
else:
dev_str = "N/A"
print(f" {frame_num:<10} {inter_arrival_time:.6f}s{'':<3} {dev_str:<12}")
# High jitter flows summary
high_jitter = analyzer.get_high_jitter_flows()
if high_jitter:
print(f"\n" + "=" * 80)
print("HIGH JITTER FLOWS (Coefficient of Variation > 0.1)")
print("=" * 80)
for flow in high_jitter[:10]: # Show top 10
cv = flow.std_inter_arrival / flow.avg_inter_arrival if flow.avg_inter_arrival > 0 else 0
print(f"{flow.src_ip} -> {flow.dst_ip}: CV = {cv:.3f}")
print(f"\n" + "=" * 80)
print("REPORT COMPLETE")
print("=" * 80)
def launch_gui(args):
"""Launch GUI mode"""
try:
from .gui import GUI_AVAILABLE, StreamLensMainWindow
from PySide6.QtWidgets import QApplication
if not GUI_AVAILABLE:
print("Error: PySide6 not available. Please install with: pip install PySide6")
sys.exit(1)
# Create QApplication
app = QApplication(sys.argv)
app.setApplicationName("StreamLens")
app.setApplicationDisplayName("StreamLens - Ethernet Traffic Analyzer")
# Create main window
window = StreamLensMainWindow()
window.show()
# If a PCAP file was specified, load it
if args.pcap:
window.load_pcap_file(args.pcap)
# Start event loop
sys.exit(app.exec())
except ImportError as e:
print(f"Error: GUI dependencies not available: {e}")
print("Please install PySide6: pip install PySide6")
sys.exit(1)
except Exception as e:
print(f"Error launching GUI: {e}")
sys.exit(1)
if __name__ == "__main__":
main()