Files
StreamLens/analyzer/protocols/standard.py

97 lines
2.8 KiB
Python

"""
Standard protocol dissectors (Ethernet, IP, TCP, UDP, etc.)
"""
from typing import Dict, Optional
try:
from scapy.all import Packet, Ether, IP, UDP, TCP
except ImportError:
print("Error: scapy library required. Install with: pip install scapy")
import sys
sys.exit(1)
class StandardProtocolDissectors:
"""Collection of standard protocol dissectors"""
def __init__(self):
self.dissectors = {
'ethernet': self._dissect_ethernet,
'ip': self._dissect_ip,
'udp': self._dissect_udp,
'tcp': self._dissect_tcp
}
def dissect_all(self, packet: Packet) -> Dict[str, Optional[Dict]]:
"""Apply all standard dissectors to a packet"""
results = {}
for name, dissector in self.dissectors.items():
try:
results[name] = dissector(packet)
except Exception as e:
results[name] = {'error': str(e)}
return results
def _dissect_ethernet(self, packet: Packet) -> Optional[Dict]:
"""Dissect Ethernet layer"""
try:
if packet.haslayer(Ether):
eth = packet[Ether]
return {
'src_mac': eth.src,
'dst_mac': eth.dst,
'type': hex(eth.type)
}
except:
pass
return None
def _dissect_ip(self, packet: Packet) -> Optional[Dict]:
"""Dissect IP layer"""
try:
if packet.haslayer(IP):
ip = packet[IP]
return {
'version': ip.version,
'src': ip.src,
'dst': ip.dst,
'protocol': ip.proto,
'ttl': ip.ttl,
'length': ip.len
}
except:
pass
return None
def _dissect_udp(self, packet: Packet) -> Optional[Dict]:
"""Dissect UDP layer"""
try:
if packet.haslayer(UDP):
udp = packet[UDP]
return {
'src_port': udp.sport,
'dst_port': udp.dport,
'length': udp.len,
'checksum': hex(udp.chksum)
}
except:
pass
return None
def _dissect_tcp(self, packet: Packet) -> Optional[Dict]:
"""Dissect TCP layer"""
try:
if packet.haslayer(TCP):
tcp = packet[TCP]
return {
'src_port': tcp.sport,
'dst_port': tcp.dport,
'seq': tcp.seq,
'ack': tcp.ack,
'flags': tcp.flags,
'window': tcp.window
}
except:
pass
return None