#!/usr/bin/env python3 """ Debug version of StreamLens to identify segmentation fault issues """ import sys import traceback import gc def debug_pcap_loading(): """Debug PCAP loading step by step""" try: print("Step 1: Testing basic scapy import...") from scapy.all import rdpcap, Packet print("✓ Scapy imported successfully") print("Step 2: Testing small packet read...") try: # Try to read just the first few packets packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=10) print(f"✓ Successfully read {len(packets)} packets") # Test packet access if packets: print(f"✓ First packet: {len(packets[0])} bytes") print(f"✓ Packet timestamp: {packets[0].time}") # Test packet layers print(f"✓ Packet layers: {packets[0].layers()}") except Exception as e: print(f"✗ Error reading packets: {e}") traceback.print_exc() return False print("Step 3: Testing larger packet batch...") try: packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=100) print(f"✓ Successfully read {len(packets)} packets") except Exception as e: print(f"✗ Error reading 100 packets: {e}") traceback.print_exc() return False print("Step 4: Testing full file load with memory management...") try: # Force garbage collection before loading gc.collect() packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng") print(f"✓ Successfully loaded {len(packets)} total packets") # Test memory usage import psutil process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 print(f"✓ Memory usage: {memory_mb:.1f} MB") except ImportError: print("Note: psutil not available for memory monitoring") except Exception as e: print(f"✗ Error loading full file: {e}") traceback.print_exc() return False return True except Exception as e: print(f"✗ Critical error in debug: {e}") traceback.print_exc() return False def debug_analyzer_components(): """Debug analyzer components individually""" try: print("\nStep 5: Testing analyzer components...") sys.path.insert(0, '/Users/noise/Code/streamlens') print(" 5a: Importing flow manager...") from analyzer.analysis.flow_manager import FlowManager print(" ✓ FlowManager imported") print(" 5b: Importing statistics engine...") from analyzer.analysis.statistics import StatisticsEngine stats_engine = StatisticsEngine() print(" ✓ StatisticsEngine created") print(" 5c: Creating flow manager...") flow_manager = FlowManager(stats_engine) print(" ✓ FlowManager created") print(" 5d: Testing protocol dissectors...") from analyzer.protocols.chapter10 import Chapter10Dissector ch10_dissector = Chapter10Dissector() print(" ✓ Chapter10Dissector created") print(" 5e: Importing main analyzer...") from analyzer.analysis.core import EthernetAnalyzer analyzer = EthernetAnalyzer() print(" ✓ EthernetAnalyzer created") return True except Exception as e: print(f"✗ Error in analyzer components: {e}") traceback.print_exc() return False def debug_packet_processing(): """Debug packet processing with small batch""" try: print("\nStep 6: Testing packet processing...") # Load small batch from scapy.all import rdpcap packets = rdpcap("/Users/noise/Code/streamlens/FSTDaircraft.pcapng", count=50) print(f" Loaded {len(packets)} packets for processing test") # Create analyzer sys.path.insert(0, '/Users/noise/Code/streamlens') from analyzer.analysis.core import EthernetAnalyzer analyzer = EthernetAnalyzer() # Process packets one by one for i, packet in enumerate(packets[:10]): # Test first 10 try: analyzer._process_single_packet(packet, i+1) if i % 5 == 0: print(f" ✓ Processed packet {i+1}") except Exception as e: print(f" ✗ Error processing packet {i+1}: {e}") return False print(f" ✓ Successfully processed {min(10, len(packets))} packets") # Test statistics calculation try: analyzer.calculate_statistics() print(" ✓ Statistics calculation successful") except Exception as e: print(f" ✗ Error in statistics: {e}") return False return True except Exception as e: print(f"✗ Error in packet processing test: {e}") traceback.print_exc() return False def main(): """Run debug sequence""" print("🐛 StreamLens Debug Mode") print("=" * 50) # Test each component steps = [ ("PCAP Loading", debug_pcap_loading), ("Analyzer Components", debug_analyzer_components), ("Packet Processing", debug_packet_processing) ] for step_name, step_func in steps: print(f"\n🔍 Testing: {step_name}") print("-" * 30) try: success = step_func() if success: print(f"✅ {step_name} - PASSED") else: print(f"❌ {step_name} - FAILED") print("Stopping debug sequence due to failure") break except Exception as e: print(f"💥 {step_name} - CRASHED: {e}") traceback.print_exc() break # Force garbage collection between steps gc.collect() print("\n" + "=" * 50) print("Debug sequence complete") if __name__ == "__main__": main()