96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Test signal visualization outside of TUI context
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
|
||
# Add the analyzer package to the path
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from analyzer.utils.signal_visualizer import signal_visualizer
|
||
from analyzer.analysis.core import EthernetAnalyzer
|
||
from analyzer.utils.pcap_loader import PCAPLoader
|
||
|
||
def test_signal_visualization():
|
||
"""Test Chapter 10 signal visualization"""
|
||
|
||
print("Testing Chapter 10 Signal Visualization")
|
||
print("=" * 50)
|
||
|
||
# Load PCAP file
|
||
print("Loading PCAP file...")
|
||
analyzer = EthernetAnalyzer()
|
||
loader = PCAPLoader("FSTDaircraft.pcapng")
|
||
packets = loader.load_all()
|
||
analyzer.all_packets = packets
|
||
|
||
# Process packets
|
||
print("Processing packets...")
|
||
for i, packet in enumerate(packets, 1):
|
||
analyzer._process_single_packet(packet, i)
|
||
|
||
analyzer.calculate_statistics()
|
||
|
||
# Find a Chapter 10 flow
|
||
print("Finding Chapter 10 flows...")
|
||
flows_list = list(analyzer.flows.values())
|
||
flows_list.sort(key=lambda x: (
|
||
analyzer.statistics_engine.get_max_sigma_deviation(x),
|
||
x.frame_count
|
||
), reverse=True)
|
||
|
||
ch10_flow = None
|
||
for flow in flows_list:
|
||
if any('CH10' in ft or 'TMATS' in ft for ft in flow.frame_types.keys()):
|
||
ch10_flow = flow
|
||
break
|
||
|
||
if not ch10_flow:
|
||
print("No Chapter 10 flows found!")
|
||
return
|
||
|
||
print(f"Found Chapter 10 flow: {ch10_flow.src_ip} -> {ch10_flow.dst_ip}")
|
||
print(f" Packets: {ch10_flow.frame_count}")
|
||
print(f" Frame types: {list(ch10_flow.frame_types.keys())}")
|
||
print(f" Max sigma deviation: {analyzer.statistics_engine.get_max_sigma_deviation(ch10_flow):.2f}σ")
|
||
|
||
# Get packets for this flow
|
||
flow_packets = []
|
||
for packet in packets:
|
||
try:
|
||
if hasattr(packet, 'haslayer'):
|
||
from scapy.all import IP
|
||
if packet.haslayer(IP):
|
||
ip_layer = packet[IP]
|
||
if ip_layer.src == ch10_flow.src_ip and ip_layer.dst == ch10_flow.dst_ip:
|
||
flow_packets.append(packet)
|
||
except:
|
||
continue
|
||
|
||
print(f" Flow packets: {len(flow_packets)}")
|
||
|
||
# Test signal visualization (should save to files)
|
||
print("\nTesting signal visualization...")
|
||
try:
|
||
signal_visualizer.visualize_flow_signals(ch10_flow, flow_packets)
|
||
print("✓ Signal visualization completed successfully!")
|
||
|
||
# Check if files were created
|
||
import glob
|
||
plot_files = glob.glob("signal_plot_*.png")
|
||
if plot_files:
|
||
print(f"✓ Generated plot files: {plot_files}")
|
||
else:
|
||
print("⚠ No plot files found - visualization may have used interactive mode")
|
||
|
||
except Exception as e:
|
||
print(f"✗ Signal visualization failed: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
print("\nTest completed!")
|
||
|
||
if __name__ == "__main__":
|
||
test_signal_visualization() |