96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Test signal visualization outside of TUI context
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
|
|||
|
|
# Add the analyzer package to the path
|
|||
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
from analyzer.utils.signal_visualizer import signal_visualizer
|
|||
|
|
from analyzer.analysis.core import EthernetAnalyzer
|
|||
|
|
from analyzer.utils.pcap_loader import PCAPLoader
|
|||
|
|
|
|||
|
|
def test_signal_visualization():
|
|||
|
|
"""Test Chapter 10 signal visualization"""
|
|||
|
|
|
|||
|
|
print("Testing Chapter 10 Signal Visualization")
|
|||
|
|
print("=" * 50)
|
|||
|
|
|
|||
|
|
# Load PCAP file
|
|||
|
|
print("Loading PCAP file...")
|
|||
|
|
analyzer = EthernetAnalyzer()
|
|||
|
|
loader = PCAPLoader("FSTDaircraft.pcapng")
|
|||
|
|
packets = loader.load_all()
|
|||
|
|
analyzer.all_packets = packets
|
|||
|
|
|
|||
|
|
# Process packets
|
|||
|
|
print("Processing packets...")
|
|||
|
|
for i, packet in enumerate(packets, 1):
|
|||
|
|
analyzer._process_single_packet(packet, i)
|
|||
|
|
|
|||
|
|
analyzer.calculate_statistics()
|
|||
|
|
|
|||
|
|
# Find a Chapter 10 flow
|
|||
|
|
print("Finding Chapter 10 flows...")
|
|||
|
|
flows_list = list(analyzer.flows.values())
|
|||
|
|
flows_list.sort(key=lambda x: (
|
|||
|
|
analyzer.statistics_engine.get_max_sigma_deviation(x),
|
|||
|
|
x.frame_count
|
|||
|
|
), reverse=True)
|
|||
|
|
|
|||
|
|
ch10_flow = None
|
|||
|
|
for flow in flows_list:
|
|||
|
|
if any('CH10' in ft or 'TMATS' in ft for ft in flow.frame_types.keys()):
|
|||
|
|
ch10_flow = flow
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if not ch10_flow:
|
|||
|
|
print("No Chapter 10 flows found!")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f"Found Chapter 10 flow: {ch10_flow.src_ip} -> {ch10_flow.dst_ip}")
|
|||
|
|
print(f" Packets: {ch10_flow.frame_count}")
|
|||
|
|
print(f" Frame types: {list(ch10_flow.frame_types.keys())}")
|
|||
|
|
print(f" Max sigma deviation: {analyzer.statistics_engine.get_max_sigma_deviation(ch10_flow):.2f}σ")
|
|||
|
|
|
|||
|
|
# Get packets for this flow
|
|||
|
|
flow_packets = []
|
|||
|
|
for packet in packets:
|
|||
|
|
try:
|
|||
|
|
if hasattr(packet, 'haslayer'):
|
|||
|
|
from scapy.all import IP
|
|||
|
|
if packet.haslayer(IP):
|
|||
|
|
ip_layer = packet[IP]
|
|||
|
|
if ip_layer.src == ch10_flow.src_ip and ip_layer.dst == ch10_flow.dst_ip:
|
|||
|
|
flow_packets.append(packet)
|
|||
|
|
except:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f" Flow packets: {len(flow_packets)}")
|
|||
|
|
|
|||
|
|
# Test signal visualization (should save to files)
|
|||
|
|
print("\nTesting signal visualization...")
|
|||
|
|
try:
|
|||
|
|
signal_visualizer.visualize_flow_signals(ch10_flow, flow_packets)
|
|||
|
|
print("✓ Signal visualization completed successfully!")
|
|||
|
|
|
|||
|
|
# Check if files were created
|
|||
|
|
import glob
|
|||
|
|
plot_files = glob.glob("signal_plot_*.png")
|
|||
|
|
if plot_files:
|
|||
|
|
print(f"✓ Generated plot files: {plot_files}")
|
|||
|
|
else:
|
|||
|
|
print("⚠ No plot files found - visualization may have used interactive mode")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"✗ Signal visualization failed: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
|
|||
|
|
print("\nTest completed!")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
test_signal_visualization()
|