Files
StreamLens/test_signal_visualization.py
2025-07-25 21:45:07 -04:00

96 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Test signal visualization outside of TUI context
"""
import sys
import os
# Add the analyzer package to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from analyzer.utils.signal_visualizer import signal_visualizer
from analyzer.analysis.core import EthernetAnalyzer
from analyzer.utils.pcap_loader import PCAPLoader
def test_signal_visualization():
"""Test Chapter 10 signal visualization"""
print("Testing Chapter 10 Signal Visualization")
print("=" * 50)
# Load PCAP file
print("Loading PCAP file...")
analyzer = EthernetAnalyzer()
loader = PCAPLoader("FSTDaircraft.pcapng")
packets = loader.load_all()
analyzer.all_packets = packets
# Process packets
print("Processing packets...")
for i, packet in enumerate(packets, 1):
analyzer._process_single_packet(packet, i)
analyzer.calculate_statistics()
# Find a Chapter 10 flow
print("Finding Chapter 10 flows...")
flows_list = list(analyzer.flows.values())
flows_list.sort(key=lambda x: (
analyzer.statistics_engine.get_max_sigma_deviation(x),
x.frame_count
), reverse=True)
ch10_flow = None
for flow in flows_list:
if any('CH10' in ft or 'TMATS' in ft for ft in flow.frame_types.keys()):
ch10_flow = flow
break
if not ch10_flow:
print("No Chapter 10 flows found!")
return
print(f"Found Chapter 10 flow: {ch10_flow.src_ip} -> {ch10_flow.dst_ip}")
print(f" Packets: {ch10_flow.frame_count}")
print(f" Frame types: {list(ch10_flow.frame_types.keys())}")
print(f" Max sigma deviation: {analyzer.statistics_engine.get_max_sigma_deviation(ch10_flow):.2f}σ")
# Get packets for this flow
flow_packets = []
for packet in packets:
try:
if hasattr(packet, 'haslayer'):
from scapy.all import IP
if packet.haslayer(IP):
ip_layer = packet[IP]
if ip_layer.src == ch10_flow.src_ip and ip_layer.dst == ch10_flow.dst_ip:
flow_packets.append(packet)
except:
continue
print(f" Flow packets: {len(flow_packets)}")
# Test signal visualization (should save to files)
print("\nTesting signal visualization...")
try:
signal_visualizer.visualize_flow_signals(ch10_flow, flow_packets)
print("✓ Signal visualization completed successfully!")
# Check if files were created
import glob
plot_files = glob.glob("signal_plot_*.png")
if plot_files:
print(f"✓ Generated plot files: {plot_files}")
else:
print("⚠ No plot files found - visualization may have used interactive mode")
except Exception as e:
print(f"✗ Signal visualization failed: {e}")
import traceback
traceback.print_exc()
print("\nTest completed!")
if __name__ == "__main__":
test_signal_visualization()