Files
airstream/airstream_pyshark.py
2025-08-03 20:20:55 -04:00

179 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Airstream PyShark Proof of Concept
===================================
Alternative implementation using PyShark for enhanced protocol support.
Key Features:
- Uses Wireshark dissectors for comprehensive protocol decoding
- Supports custom Lua dissectors automatically
- Provides Wireshark display filter support
- Can decode any protocol that Wireshark understands
Requirements:
- pip install pyshark
- Wireshark/tshark must be installed on the system
Usage:
./airstream_pyshark.py -p capture.pcap
./airstream_pyshark.py -i eth0 -c 100
./airstream_pyshark.py -p capture.pcap --filter "tcp.port==80"
"""
import argparse
import sys
from typing import List
try:
import pyshark
except ImportError:
print("Error: pyshark not installed. Run: pip install pyshark")
sys.exit(1)
from pyshark_poc.analyzer import PySharkAnalyzer
from pyshark_poc.stats import STATS_TYPES
def get_interfaces():
"""Get list of available network interfaces."""
try:
# PyShark doesn't provide interface listing directly
# Use tshark command
import subprocess
result = subprocess.run(['tshark', '-D'], capture_output=True, text=True)
if result.returncode == 0:
interfaces = []
for line in result.stdout.strip().split('\n'):
# Parse lines like "1. eth0"
parts = line.split('. ', 1)
if len(parts) == 2:
interfaces.append(parts[1].split(' ')[0])
return interfaces
except:
pass
return ["Unable to list interfaces - tshark may not be installed"]
def main():
parser = argparse.ArgumentParser(
description="Airstream PyShark - Enhanced packet flow analyzer with Wireshark dissector support",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Analyze PCAP file:
%(prog)s -p capture.pcap
Live capture with filter:
%(prog)s -i eth0 -c 1000 --filter "tcp.port==443"
Use BPF filter for capture:
%(prog)s -i eth0 --bpf "port 80 or port 443"
Export to CSV:
%(prog)s -p capture.pcap -o results.csv
PyShark Advantages:
- Full Wireshark protocol decoding
- Custom Lua dissector support
- Advanced display filters
- Automatic field extraction
"""
)
# Input sources
input_group = parser.add_mutually_exclusive_group()
input_group.add_argument('-p', '--pcap', help='PCAP file to analyze')
input_group.add_argument('-i', '--interface', help='Network interface for live capture')
# Capture options
parser.add_argument('-c', '--count', type=int, default=100,
help='Number of packets to capture (default: 100)')
# Filtering options
parser.add_argument('--filter', '--display-filter', dest='display_filter',
help='Wireshark display filter (e.g., "tcp.port==80")')
parser.add_argument('--bpf', '--bpf-filter', dest='bpf_filter',
help='BPF capture filter for live capture (e.g., "port 80")')
# Output options
parser.add_argument('-o', '--output', help='CSV output file')
# Statistics options
parser.add_argument('-s', '--stats', nargs='+',
choices=list(STATS_TYPES.keys()) + ['all'],
default=['overview'],
help='Statistics types to use (default: overview)')
# Utility options
parser.add_argument('-l', '--list-interfaces', action='store_true',
help='List available network interfaces')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose output')
args = parser.parse_args()
# Handle interface listing
if args.list_interfaces:
print("Available interfaces:")
for iface in get_interfaces():
print(f" {iface}")
return
# Validate input
if not (args.pcap or args.interface):
parser.error("Specify either --pcap or --interface")
# Handle statistics selection
if 'all' in args.stats:
selected_stats = list(STATS_TYPES.keys())
stats_classes = list(STATS_TYPES.values())
else:
selected_stats = args.stats
stats_classes = [STATS_TYPES[s] for s in args.stats]
print(f"Using stats: {', '.join(selected_stats)}")
# Create analyzer
analyzer = PySharkAnalyzer(stats_classes)
try:
# Perform analysis
if args.pcap:
analyzer.analyze_pcap(args.pcap, display_filter=args.display_filter)
else:
analyzer.analyze_live(
args.interface,
args.count,
display_filter=args.display_filter,
bpf_filter=args.bpf_filter
)
# Print results
analyzer.print_summary()
# Show protocol distribution if verbose
if args.verbose:
print("\nProtocol distribution:")
protocol_summary = analyzer.get_protocol_summary()
if not protocol_summary.empty:
print(protocol_summary.to_string(index=False))
# Save to CSV if requested
if args.output:
df = analyzer.summary()
df.to_csv(args.output, index=False)
print(f"\nSaved results to: {args.output}")
except KeyboardInterrupt:
print("\nInterrupted by user")
except Exception as e:
print(f"Error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()