106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test the progress integration without requiring textual UI
|
|
"""
|
|
|
|
import sys
|
|
sys.path.append('.')
|
|
|
|
from analyzer.analysis import EthernetAnalyzer
|
|
from analyzer.analysis.background_analyzer import BackgroundAnalyzer, ParsingProgress
|
|
import time
|
|
|
|
def test_progress_callback():
|
|
"""Test that progress callbacks work correctly"""
|
|
print("=== Testing Progress Integration ===")
|
|
|
|
progress_updates = []
|
|
completed = False
|
|
|
|
def progress_callback(progress: ParsingProgress):
|
|
"""Capture progress updates"""
|
|
progress_updates.append({
|
|
'processed': progress.processed_packets,
|
|
'total': progress.total_packets,
|
|
'percent': progress.percent_complete,
|
|
'pps': progress.packets_per_second,
|
|
'eta': progress.estimated_time_remaining,
|
|
'complete': progress.is_complete,
|
|
'error': progress.error
|
|
})
|
|
|
|
print(f"📊 Progress: {progress.processed_packets:,}/{progress.total_packets:,} "
|
|
f"({progress.percent_complete:.1f}%) @ {progress.packets_per_second:.0f} pkt/s "
|
|
f"ETA: {progress.estimated_time_remaining:.1f}s")
|
|
|
|
if progress.is_complete:
|
|
nonlocal completed
|
|
completed = True
|
|
print("✅ Parsing completed!")
|
|
|
|
if progress.error:
|
|
print(f"❌ Error: {progress.error}")
|
|
|
|
def flow_update_callback():
|
|
"""Handle flow updates"""
|
|
print("🔄 Flow data updated")
|
|
|
|
# Create analyzer with progress callback
|
|
analyzer = EthernetAnalyzer(enable_realtime=False, outlier_threshold_sigma=3.0)
|
|
bg_analyzer = BackgroundAnalyzer(
|
|
analyzer=analyzer,
|
|
num_threads=1,
|
|
batch_size=10, # Very small batches to slow down processing for testing
|
|
progress_callback=progress_callback,
|
|
flow_update_callback=flow_update_callback
|
|
)
|
|
|
|
# Test with our PCAP file
|
|
pcap_file = "1 PTPGM.pcapng"
|
|
print(f"🚀 Starting parsing of {pcap_file}")
|
|
|
|
bg_analyzer.start_parsing(pcap_file)
|
|
|
|
# Wait for completion
|
|
start_time = time.time()
|
|
while bg_analyzer.is_parsing and not completed:
|
|
time.sleep(0.1)
|
|
# Timeout after 30 seconds
|
|
if time.time() - start_time > 30:
|
|
print("⏰ Timeout reached")
|
|
break
|
|
|
|
# Clean up
|
|
bg_analyzer.cleanup()
|
|
|
|
print(f"\\n📈 Progress Statistics:")
|
|
print(f" Total updates: {len(progress_updates)}")
|
|
|
|
if progress_updates:
|
|
first = progress_updates[0]
|
|
last = progress_updates[-1]
|
|
print(f" First update: {first['processed']}/{first['total']} ({first['percent']:.1f}%)")
|
|
print(f" Last update: {last['processed']}/{last['total']} ({last['percent']:.1f}%)")
|
|
print(f" Max rate: {max(u['pps'] for u in progress_updates):.0f} pkt/s")
|
|
print(f" Completed: {last['complete']}")
|
|
|
|
# Show sample of progress updates
|
|
print(f"\\n📝 Sample Progress Updates:")
|
|
sample_indices = [0, len(progress_updates)//4, len(progress_updates)//2,
|
|
3*len(progress_updates)//4, -1]
|
|
for i in sample_indices:
|
|
if i < len(progress_updates):
|
|
u = progress_updates[i]
|
|
print(f" {u['processed']:>4}/{u['total']} ({u['percent']:>5.1f}%) "
|
|
f"@ {u['pps']:>6.0f} pkt/s")
|
|
|
|
print("\\n🎉 Progress integration test completed!")
|
|
return len(progress_updates) > 0
|
|
|
|
if __name__ == "__main__":
|
|
success = test_progress_callback()
|
|
if success:
|
|
print("✅ All tests passed!")
|
|
else:
|
|
print("❌ Tests failed!")
|
|
sys.exit(1) |