116 lines
4.3 KiB
Python
116 lines
4.3 KiB
Python
"""
|
|
Core analysis engine for the Ethernet Traffic Analyzer
|
|
"""
|
|
|
|
import sys
|
|
import threading
|
|
from typing import List, Dict
|
|
|
|
try:
|
|
from scapy.all import rdpcap, sniff, Packet
|
|
except ImportError:
|
|
print("Error: scapy library required. Install with: pip install scapy")
|
|
sys.exit(1)
|
|
|
|
from .flow_manager import FlowManager
|
|
from .statistics import StatisticsEngine
|
|
from ..models import AnalysisResult
|
|
|
|
|
|
class EthernetAnalyzer:
|
|
"""Main analyzer class for ethernet traffic analysis"""
|
|
|
|
def __init__(self, enable_realtime: bool = False, outlier_threshold_sigma: float = 3.0):
|
|
self.statistics_engine = StatisticsEngine(outlier_threshold_sigma=outlier_threshold_sigma, enable_realtime=enable_realtime)
|
|
self.flow_manager = FlowManager(self.statistics_engine)
|
|
self.all_packets: List[Packet] = []
|
|
self.is_live = False
|
|
self.stop_capture = False
|
|
self.is_parsing = False # Flag to track parsing state
|
|
|
|
# Expose flows for backward compatibility
|
|
self.flows = self.flow_manager.flows
|
|
|
|
# Create a simple dissector for backward compatibility
|
|
self.dissector = SimpleFrameDissector(self.flow_manager)
|
|
|
|
def analyze_pcap(self, pcap_file: str) -> None:
|
|
"""Analyze a pcap file"""
|
|
print(f"Loading pcap file: {pcap_file}")
|
|
try:
|
|
packets = rdpcap(pcap_file)
|
|
self.all_packets = packets
|
|
print(f"Loaded {len(packets)} packets")
|
|
self._process_packets(packets)
|
|
except Exception as e:
|
|
print(f"Error loading pcap file: {e}")
|
|
sys.exit(1)
|
|
|
|
def start_live_capture(self, interface: str = None, filter_str: str = None) -> None:
|
|
"""Start live packet capture"""
|
|
self.is_live = True
|
|
print(f"Starting live capture on interface: {interface or 'default'}")
|
|
|
|
def packet_handler(packet):
|
|
if self.stop_capture:
|
|
return
|
|
self.all_packets.append(packet)
|
|
self._process_single_packet(packet, len(self.all_packets))
|
|
|
|
try:
|
|
sniff(iface=interface, filter=filter_str, prn=packet_handler,
|
|
stop_filter=lambda x: self.stop_capture)
|
|
except Exception as e:
|
|
print(f"Error during live capture: {e}")
|
|
|
|
def _process_packets(self, packets: List[Packet]) -> None:
|
|
"""Process a list of packets"""
|
|
for i, packet in enumerate(packets, 1):
|
|
self._process_single_packet(packet, i)
|
|
|
|
def _process_single_packet(self, packet: Packet, frame_num: int) -> None:
|
|
"""Process a single packet"""
|
|
self.flow_manager.process_packet(packet, frame_num)
|
|
|
|
def calculate_statistics(self) -> None:
|
|
"""Calculate timing statistics and detect outliers"""
|
|
self.statistics_engine.calculate_flow_statistics(self.flows)
|
|
|
|
def get_summary(self) -> Dict:
|
|
"""Get analysis summary"""
|
|
flow_summary = self.flow_manager.get_flows_summary()
|
|
return {
|
|
'total_packets': len(self.all_packets),
|
|
'unique_flows': flow_summary['total_flows'],
|
|
'unique_ips': flow_summary['unique_ips'],
|
|
'flows': flow_summary['flows']
|
|
}
|
|
|
|
def get_analysis_result(self) -> AnalysisResult:
|
|
"""Get structured analysis result"""
|
|
summary = self.get_summary()
|
|
return AnalysisResult(
|
|
total_packets=summary['total_packets'],
|
|
unique_flows=summary['unique_flows'],
|
|
unique_ips=summary['unique_ips'],
|
|
flows=summary['flows']
|
|
)
|
|
|
|
def get_high_jitter_flows(self, threshold: float = 0.1):
|
|
"""Get flows with high timing jitter"""
|
|
return self.statistics_engine.identify_high_jitter_flows(self.flows, threshold)
|
|
|
|
def get_summary_statistics(self) -> Dict:
|
|
"""Get summary statistics across all flows"""
|
|
return self.statistics_engine.get_flow_summary_statistics(self.flows)
|
|
|
|
|
|
class SimpleFrameDissector:
|
|
"""Simple frame dissector for backward compatibility"""
|
|
|
|
def __init__(self, flow_manager: FlowManager):
|
|
self.flow_manager = flow_manager
|
|
|
|
def dissect_frame(self, packet: Packet, frame_num: int) -> Dict:
|
|
"""Dissect a frame using the flow manager's dissection system"""
|
|
return self.flow_manager._dissect_packet(packet, frame_num) |