Files
StreamLens/test_progress_integration.py

106 lines
3.6 KiB
Python

#!/usr/bin/env python3
"""
Test the progress integration without requiring textual UI
"""
import sys
sys.path.append('.')
from analyzer.analysis import EthernetAnalyzer
from analyzer.analysis.background_analyzer import BackgroundAnalyzer, ParsingProgress
import time
def test_progress_callback():
"""Test that progress callbacks work correctly"""
print("=== Testing Progress Integration ===")
progress_updates = []
completed = False
def progress_callback(progress: ParsingProgress):
"""Capture progress updates"""
progress_updates.append({
'processed': progress.processed_packets,
'total': progress.total_packets,
'percent': progress.percent_complete,
'pps': progress.packets_per_second,
'eta': progress.estimated_time_remaining,
'complete': progress.is_complete,
'error': progress.error
})
print(f"📊 Progress: {progress.processed_packets:,}/{progress.total_packets:,} "
f"({progress.percent_complete:.1f}%) @ {progress.packets_per_second:.0f} pkt/s "
f"ETA: {progress.estimated_time_remaining:.1f}s")
if progress.is_complete:
nonlocal completed
completed = True
print("✅ Parsing completed!")
if progress.error:
print(f"❌ Error: {progress.error}")
def flow_update_callback():
"""Handle flow updates"""
print("🔄 Flow data updated")
# Create analyzer with progress callback
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
bg_analyzer = BackgroundAnalyzer(
analyzer=analyzer,
num_threads=1,
batch_size=10, # Very small batches to slow down processing for testing
progress_callback=progress_callback,
flow_update_callback=flow_update_callback
)
# Test with our PCAP file
pcap_file = "1 PTPGM.pcapng"
print(f"🚀 Starting parsing of {pcap_file}")
bg_analyzer.start_parsing(pcap_file)
# Wait for completion
start_time = time.time()
while bg_analyzer.is_parsing and not completed:
time.sleep(0.1)
# Timeout after 30 seconds
if time.time() - start_time > 30:
print("⏰ Timeout reached")
break
# Clean up
bg_analyzer.cleanup()
print(f"\\n📈 Progress Statistics:")
print(f" Total updates: {len(progress_updates)}")
if progress_updates:
first = progress_updates[0]
last = progress_updates[-1]
print(f" First update: {first['processed']}/{first['total']} ({first['percent']:.1f}%)")
print(f" Last update: {last['processed']}/{last['total']} ({last['percent']:.1f}%)")
print(f" Max rate: {max(u['pps'] for u in progress_updates):.0f} pkt/s")
print(f" Completed: {last['complete']}")
# Show sample of progress updates
print(f"\\n📝 Sample Progress Updates:")
sample_indices = [0, len(progress_updates)//4, len(progress_updates)//2,
3*len(progress_updates)//4, -1]
for i in sample_indices:
if i < len(progress_updates):
u = progress_updates[i]
print(f" {u['processed']:>4}/{u['total']} ({u['percent']:>5.1f}%) "
f"@ {u['pps']:>6.0f} pkt/s")
print("\\n🎉 Progress integration test completed!")
return len(progress_updates) > 0
if __name__ == "__main__":
success = test_progress_callback()
if success:
print("✅ All tests passed!")
else:
print("❌ Tests failed!")
sys.exit(1)