Files
StreamLens/debug_analyzer.py
2025-07-25 21:45:07 -04:00

188 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
Debug version of StreamLens to identify segmentation fault issues
"""
import sys
import traceback
import gc
def debug_pcap_loading():
"""Debug PCAP loading step by step"""
try:
print("Step 1: Testing basic scapy import...")
from scapy.all import rdpcap, Packet
print("✓ Scapy imported successfully")
print("Step 2: Testing small packet read...")
try:
# Try to read just the first few packets
packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=10)
print(f"✓ Successfully read {len(packets)} packets")
# Test packet access
if packets:
print(f"✓ First packet: {len(packets[0])} bytes")
print(f"✓ Packet timestamp: {packets[0].time}")
# Test packet layers
print(f"✓ Packet layers: {packets[0].layers()}")
except Exception as e:
print(f"✗ Error reading packets: {e}")
traceback.print_exc()
return False
print("Step 3: Testing larger packet batch...")
try:
packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=100)
print(f"✓ Successfully read {len(packets)} packets")
except Exception as e:
print(f"✗ Error reading 100 packets: {e}")
traceback.print_exc()
return False
print("Step 4: Testing full file load with memory management...")
try:
# Force garbage collection before loading
gc.collect()
packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng")
print(f"✓ Successfully loaded {len(packets)} total packets")
# Test memory usage
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"✓ Memory usage: {memory_mb:.1f} MB")
except ImportError:
print("Note: psutil not available for memory monitoring")
except Exception as e:
print(f"✗ Error loading full file: {e}")
traceback.print_exc()
return False
return True
except Exception as e:
print(f"✗ Critical error in debug: {e}")
traceback.print_exc()
return False
def debug_analyzer_components():
"""Debug analyzer components individually"""
try:
print("\nStep 5: Testing analyzer components...")
sys.path.insert(0, '/Users/noise/Code/streamlens')
print(" 5a: Importing flow manager...")
from analyzer.analysis.flow_manager import FlowManager
print(" ✓ FlowManager imported")
print(" 5b: Importing statistics engine...")
from analyzer.analysis.statistics import StatisticsEngine
stats_engine = StatisticsEngine()
print(" ✓ StatisticsEngine created")
print(" 5c: Creating flow manager...")
flow_manager = FlowManager(stats_engine)
print(" ✓ FlowManager created")
print(" 5d: Testing protocol dissectors...")
from analyzer.protocols.chapter10 import Chapter10Dissector
ch10_dissector = Chapter10Dissector()
print(" ✓ Chapter10Dissector created")
print(" 5e: Importing main analyzer...")
from analyzer.analysis.core import EthernetAnalyzer
analyzer = EthernetAnalyzer()
print(" ✓ EthernetAnalyzer created")
return True
except Exception as e:
print(f"✗ Error in analyzer components: {e}")
traceback.print_exc()
return False
def debug_packet_processing():
"""Debug packet processing with small batch"""
try:
print("\nStep 6: Testing packet processing...")
# Load small batch
from scapy.all import rdpcap
packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=50)
print(f" Loaded {len(packets)} packets for processing test")
# Create analyzer
sys.path.insert(0, '/Users/noise/Code/streamlens')
from analyzer.analysis.core import EthernetAnalyzer
analyzer = EthernetAnalyzer()
# Process packets one by one
for i, packet in enumerate(packets[:10]): # Test first 10
try:
analyzer._process_single_packet(packet, i+1)
if i % 5 == 0:
print(f" ✓ Processed packet {i+1}")
except Exception as e:
print(f" ✗ Error processing packet {i+1}: {e}")
return False
print(f" ✓ Successfully processed {min(10, len(packets))} packets")
# Test statistics calculation
try:
analyzer.calculate_statistics()
print(" ✓ Statistics calculation successful")
except Exception as e:
print(f" ✗ Error in statistics: {e}")
return False
return True
except Exception as e:
print(f"✗ Error in packet processing test: {e}")
traceback.print_exc()
return False
def main():
"""Run debug sequence"""
print("🐛 StreamLens Debug Mode")
print("=" * 50)
# Test each component
steps = [
("PCAP Loading", debug_pcap_loading),
("Analyzer Components", debug_analyzer_components),
("Packet Processing", debug_packet_processing)
]
for step_name, step_func in steps:
print(f"\n🔍 Testing: {step_name}")
print("-" * 30)
try:
success = step_func()
if success:
print(f"{step_name} - PASSED")
else:
print(f"{step_name} - FAILED")
print("Stopping debug sequence due to failure")
break
except Exception as e:
print(f"💥 {step_name} - CRASHED: {e}")
traceback.print_exc()
break
# Force garbage collection between steps
gc.collect()
print("\n" + "=" * 50)
print("Debug sequence complete")
if __name__ == "__main__":
main()