From 0f2fc8f92c170ba4909eaa6c1b7ba743280b5597 Mon Sep 17 00:00:00 2001 From: noisedestroyers Date: Sat, 26 Jul 2025 16:51:37 -0400 Subject: [PATCH] re-focus on TUI and core --- analyzer/analysis/flow_manager.py | 65 +++++++++ analyzer/models/flow_stats.py | 26 ++-- analyzer/tui/interface.py | 4 +- analyzer/tui/panels/flow_list.py | 71 ++++++++-- architecture_redesign.md | 214 ++++++++++++++++++++++++++++++ 5 files changed, 359 insertions(+), 21 deletions(-) create mode 100644 architecture_redesign.md diff --git a/analyzer/analysis/flow_manager.py b/analyzer/analysis/flow_manager.py index 72e4ff1..bacb982 100644 --- a/analyzer/analysis/flow_manager.py +++ b/analyzer/analysis/flow_manager.py @@ -40,6 +40,9 @@ class FlowManager: timestamp = float(packet.time) packet_size = len(packet) + # Extract transport layer information + transport_info = self._extract_transport_info(packet) + # Determine basic protocol protocols = self._detect_basic_protocols(packet) @@ -51,6 +54,10 @@ class FlowManager: self.flows[flow_key] = FlowStats( src_ip=src_ip, dst_ip=dst_ip, + src_port=transport_info['src_port'], + dst_port=transport_info['dst_port'], + transport_protocol=transport_info['protocol'], + traffic_classification=self._classify_traffic(dst_ip), frame_count=0, timestamps=[], frame_numbers=[], @@ -314,6 +321,64 @@ class FlowManager: inter_arrival = timestamp - ft_stats.timestamps[-2] ft_stats.inter_arrival_times.append(inter_arrival) + def _extract_transport_info(self, packet: Packet) -> Dict[str, any]: + """Extract transport protocol and port information from packet""" + transport_info = { + 'protocol': 'Unknown', + 'src_port': 0, + 'dst_port': 0 + } + + if packet.haslayer(UDP): + udp_layer = packet[UDP] + transport_info['protocol'] = 'UDP' + transport_info['src_port'] = udp_layer.sport + transport_info['dst_port'] = udp_layer.dport + elif packet.haslayer(TCP): + tcp_layer = packet[TCP] + transport_info['protocol'] = 'TCP' + transport_info['src_port'] = tcp_layer.sport + transport_info['dst_port'] = tcp_layer.dport + elif packet.haslayer(IP): + ip_layer = packet[IP] + if ip_layer.proto == 1: + transport_info['protocol'] = 'ICMP' + elif ip_layer.proto == 2: + transport_info['protocol'] = 'IGMP' + elif ip_layer.proto == 6: + transport_info['protocol'] = 'TCP' + elif ip_layer.proto == 17: + transport_info['protocol'] = 'UDP' + else: + transport_info['protocol'] = f'IP-{ip_layer.proto}' + + return transport_info + + def _classify_traffic(self, dst_ip: str) -> str: + """Classify traffic as Unicast, Multicast, or Broadcast based on destination IP""" + try: + # Check for broadcast address + if dst_ip == '255.255.255.255': + return 'Broadcast' + + # Check for multicast ranges + if dst_ip.startswith('224.') or dst_ip.startswith('239.'): + return 'Multicast' + + # Check for other multicast ranges (224.0.0.0 to 239.255.255.255) + ip_parts = dst_ip.split('.') + if len(ip_parts) == 4: + first_octet = int(ip_parts[0]) + if 224 <= first_octet <= 239: + return 'Multicast' + + # Everything else is unicast + return 'Unicast' + + except (ValueError, IndexError): + # If IP parsing fails, default to unknown + return 'Unknown' + def get_flows_summary(self) -> Dict: """Get summary of all flows""" unique_ips = set() diff --git a/analyzer/models/flow_stats.py b/analyzer/models/flow_stats.py index 99b6c55..b97337c 100644 --- a/analyzer/models/flow_stats.py +++ b/analyzer/models/flow_stats.py @@ -26,15 +26,19 @@ class FlowStats: """Statistics for a source-destination IP pair""" src_ip: str dst_ip: str - frame_count: int - timestamps: List[float] - frame_numbers: List[int] - inter_arrival_times: List[float] - avg_inter_arrival: float - std_inter_arrival: float - outlier_frames: List[int] - outlier_details: List[Tuple[int, float]] # (frame_number, time_delta) - total_bytes: int - protocols: Set[str] - detected_protocol_types: Set[str] # Enhanced protocol detection (CH10, PTP, IENA, etc) + src_port: int = 0 # Source port (0 if not applicable/unknown) + dst_port: int = 0 # Destination port (0 if not applicable/unknown) + transport_protocol: str = "Unknown" # TCP, UDP, ICMP, IGMP, etc. + traffic_classification: str = "Unknown" # Unicast, Multicast, Broadcast + frame_count: int = 0 + timestamps: List[float] = field(default_factory=list) + frame_numbers: List[int] = field(default_factory=list) + inter_arrival_times: List[float] = field(default_factory=list) + avg_inter_arrival: float = 0.0 + std_inter_arrival: float = 0.0 + outlier_frames: List[int] = field(default_factory=list) + outlier_details: List[Tuple[int, float]] = field(default_factory=list) # (frame_number, time_delta) + total_bytes: int = 0 + protocols: Set[str] = field(default_factory=set) + detected_protocol_types: Set[str] = field(default_factory=set) # Enhanced protocol detection (CH10, PTP, IENA, etc) frame_types: Dict[str, FrameTypeStats] = field(default_factory=dict) # Per-frame-type statistics \ No newline at end of file diff --git a/analyzer/tui/interface.py b/analyzer/tui/interface.py index b30b9b2..f3f21bb 100644 --- a/analyzer/tui/interface.py +++ b/analyzer/tui/interface.py @@ -71,7 +71,7 @@ class TUIInterface: # Calculate panel dimensions based on timeline visibility if self.navigation.show_timeline: - # Top section: 70% of height, split into left 60% / right 40% + # Top section: 70% of height, split into left 70% / right 30% # Bottom section: 30% of height, full width top_height = int(height * 0.7) bottom_height = height - top_height - 2 # -2 for separators and status bar @@ -80,7 +80,7 @@ class TUIInterface: top_height = height - 2 # -2 for status bar bottom_height = 0 - left_width = int(width * 0.6) + left_width = int(width * 0.7) # Increased from 60% to 70% for better IP:port display right_width = width - left_width - 1 # -1 for separator # Draw title diff --git a/analyzer/tui/panels/flow_list.py b/analyzer/tui/panels/flow_list.py index 2c5d9f0..567eb64 100644 --- a/analyzer/tui/panels/flow_list.py +++ b/analyzer/tui/panels/flow_list.py @@ -19,9 +19,9 @@ class FlowListPanel: flows_list: List[FlowStats], selected_flow: int): """Draw the flow list panel""" - # Draw flows table header + # Draw flows table header with adjusted column widths for better alignment stdscr.addstr(y_offset, x_offset, "FLOWS:", curses.A_BOLD) - headers = f"{'Source IP':15} {'Dest IP':15} {'Pkts':5} {'Protocol':18} {'ΔT Avg':10} {'Out':4}" + headers = f"{'Src:Port':22} {'Dst:Port':22} {'Proto':6} {'Cast':5} {'#Frames':>7} {'Bytes':>7} {'Encoding':12} {'ΔT Avg':>9}" stdscr.addstr(y_offset + 1, x_offset, headers[:width-1], curses.A_UNDERLINE) # Calculate scrolling parameters @@ -40,11 +40,23 @@ class FlowListPanel: for flow_idx, flow in enumerate(flows_list): # Check if main flow line should be displayed if display_item >= scroll_offset and visible_items < max_rows: - # Draw main flow line - protocol_str = self._get_protocol_display(flow) + # Draw main flow line with new column layout + src_endpoint = f"{flow.src_ip}:{flow.src_port}" if flow.src_port > 0 else flow.src_ip + dst_endpoint = f"{flow.dst_ip}:{flow.dst_port}" if flow.dst_port > 0 else flow.dst_ip + + # Format bytes with K/M suffix + bytes_str = self._format_bytes(flow.total_bytes) + + # Get encoding information (primary detected protocol) + encoding_str = self._get_encoding_display(flow) + + # Format average time avg_time = f"{flow.avg_inter_arrival:.3f}s" if flow.avg_inter_arrival > 0 else "N/A" - line = f"{flow.src_ip:15} {flow.dst_ip:15} {flow.frame_count:5} {protocol_str:18} {avg_time:10} {'':4}" + # Abbreviate traffic classification + cast_abbrev = flow.traffic_classification[:4] if flow.traffic_classification != "Unknown" else "Unk" + + line = f"{src_endpoint:22} {dst_endpoint:22} {flow.transport_protocol:6} {cast_abbrev:5} {flow.frame_count:>7} {bytes_str:>7} {encoding_str:12} {avg_time:>9}" if display_item == selected_flow: stdscr.addstr(current_row, x_offset, line[:width-1], curses.A_REVERSE) @@ -66,8 +78,9 @@ class FlowListPanel: ft_avg = f"{ft_stats.avg_inter_arrival:.3f}s" if ft_stats.avg_inter_arrival > 0 else "N/A" outlier_count = len(ft_stats.outlier_details) if ft_stats.outlier_details else 0 - # Create frame type line aligned with columns - ft_line = f"{'':15} {'':15} {ft_stats.count:5} {frame_type:18} {ft_avg:10} {outlier_count:4}" + # Create frame type line aligned with new column layout + bytes_str_ft = self._format_bytes(ft_stats.total_bytes) + ft_line = f" └─{frame_type:18} {'':22} {'':6} {'':5} {ft_stats.count:>7} {bytes_str_ft:>7} {'':12} {ft_avg:>9}" if display_item == selected_flow: stdscr.addstr(current_row, x_offset, ft_line[:width-1], curses.A_REVERSE) @@ -124,4 +137,46 @@ class FlowListPanel: def get_total_display_items(self, flows_list: List[FlowStats]) -> int: """Public method to get total display items""" - return self._get_total_display_items(flows_list) \ No newline at end of file + return self._get_total_display_items(flows_list) + + def _format_bytes(self, bytes_count: int) -> str: + """Format byte count with K/M/G suffixes, always include magnitude indicator""" + if bytes_count >= 1_000_000_000: + return f"{bytes_count / 1_000_000_000:.1f}G" + elif bytes_count >= 1_000_000: + return f"{bytes_count / 1_000_000:.1f}M" + elif bytes_count >= 1_000: + return f"{bytes_count / 1_000:.1f}K" + else: + return f"{bytes_count}B" # Add "B" for plain bytes + + def _get_encoding_display(self, flow: FlowStats) -> str: + """Get the primary encoding/application protocol for display""" + # Prioritize specialized protocols (Chapter 10, PTP, IENA) + if flow.detected_protocol_types: + specialized = {'CH10', 'PTP', 'IENA', 'Chapter10', 'TMATS'} + found_specialized = flow.detected_protocol_types.intersection(specialized) + if found_specialized: + return list(found_specialized)[0] + + # Use first detected protocol type + return list(flow.detected_protocol_types)[0] + + # Fallback to frame types if available + if flow.frame_types: + frame_types = list(flow.frame_types.keys()) + # Look for interesting frame types first + priority_types = ['TMATS', 'CH10-Data', 'PTP-Sync', 'IENA-P', 'IENA-D'] + for ptype in priority_types: + if ptype in frame_types: + return ptype + return frame_types[0] + + # Last resort - check basic protocols + if flow.protocols: + app_protocols = {'DNS', 'HTTP', 'HTTPS', 'NTP', 'DHCP'} + found_app = flow.protocols.intersection(app_protocols) + if found_app: + return list(found_app)[0] + + return "Unknown" \ No newline at end of file diff --git a/architecture_redesign.md b/architecture_redesign.md new file mode 100644 index 0000000..0895825 --- /dev/null +++ b/architecture_redesign.md @@ -0,0 +1,214 @@ +# StreamLens Architecture Redesign + +## Core Data Model Hierarchy + +### 1. Flow Level +```python +@dataclass +class Flow: + src_ip: str + dst_ip: str + src_port: Optional[int] = None + dst_port: Optional[int] = None + transport_protocols: Dict[str, TransportProtocol] = field(default_factory=dict) + traffic_type: str = "Unknown" # Unicast/Multicast/Broadcast + first_seen: float = 0.0 + last_seen: float = 0.0 + total_packets: int = 0 + total_bytes: int = 0 +``` + +### 2. Transport Protocol Level +```python +@dataclass +class TransportProtocol: + protocol_name: str # TCP, UDP, ICMP, IGMP + port_info: Dict[str, int] # src_port, dst_port + application_encodings: Dict[str, ApplicationEncoding] = field(default_factory=dict) + packet_count: int = 0 + byte_count: int = 0 + timing_stats: TimingStatistics = field(default_factory=TimingStatistics) +``` + +### 3. Application Encoding Level +```python +@dataclass +class ApplicationEncoding: + encoding_name: str # PTP, IENA, Chapter10, RTP, etc. + packet_types: Dict[str, PacketTypeStats] = field(default_factory=dict) + decoder_metadata: Dict[str, Any] = field(default_factory=dict) + packet_count: int = 0 + byte_count: int = 0 + timing_stats: TimingStatistics = field(default_factory=TimingStatistics) + confidence_score: float = 1.0 # How sure we are about this encoding +``` + +### 4. Packet Type Level +```python +@dataclass +class PacketTypeStats: + type_name: str # Sync, Follow_Up, Delay_Req, TMATS, PCM_Data, etc. + packet_count: int = 0 + byte_count: int = 0 + timing_stats: TimingStatistics = field(default_factory=TimingStatistics) + payload_characteristics: Dict[str, Any] = field(default_factory=dict) +``` + +## Modular Decoder Framework + +### Base Decoder Interface +```python +class StreamDecoder(ABC): + @property + @abstractmethod + def encoding_name(self) -> str: + pass + + @property + @abstractmethod + def detection_ports(self) -> List[int]: + pass + + @abstractmethod + def can_decode(self, packet: Packet, transport_info: Dict) -> float: + """Return confidence score 0.0-1.0 that this decoder can handle the packet""" + pass + + @abstractmethod + def decode_packet(self, packet: Packet) -> DecodingResult: + """Decode packet and return structured result""" + pass + + @abstractmethod + def get_packet_type(self, decoded_data: Dict) -> str: + """Return specific packet type within this encoding""" + pass +``` + +### Decoder Registry +```python +class DecoderRegistry: + def __init__(self): + self.decoders = [] + self._register_default_decoders() + + def register_decoder(self, decoder: StreamDecoder): + self.decoders.append(decoder) + + def find_decoders(self, packet: Packet, transport_info: Dict) -> List[Tuple[StreamDecoder, float]]: + """Return list of (decoder, confidence) tuples sorted by confidence""" + candidates = [] + for decoder in self.decoders: + confidence = decoder.can_decode(packet, transport_info) + if confidence > 0.0: + candidates.append((decoder, confidence)) + return sorted(candidates, key=lambda x: x[1], reverse=True) +``` + +## Enhanced TUI Layout + +### Main Flow Table +``` +┌─ FLOWS ────────────────────────────────────────────────────────────────┐ +│ Src:Port │ Dst:Port │ Proto │ Cast │ #Frames │ Bytes │ ΔT │ +├─────────────────┼─────────────────┼───────┼──────┼─────────┼───────┼─────┤ +│ 192.168.1.10:0 │ 239.255.0.1:319 │ UDP │ Multi│ 1,234 │ 1.2M │ 1ms │ +│ ├─ PTP (v2) │ │ │ │ 856 │ 856K │ 1ms │ +│ │ ├─ Sync │ │ │ │ 428 │ 428K │ 2ms │ +│ │ └─ Follow_Up │ │ │ │ 428 │ 428K │ 2ms │ +│ └─ IENA │ │ │ │ 378 │ 378K │ 3ms │ +│ ├─ P-type │ │ │ │ 200 │ 200K │ 5ms │ +│ └─ D-type │ │ │ │ 178 │ 178K │ 5ms │ +└─────────────────┴─────────────────┴───────┴──────┴─────────┴───────┴─────┘ +``` + +### Navigation +- **↑↓**: Navigate flows and subflows +- **→**: Expand flow to show encodings +- **←**: Collapse flow +- **Space**: Toggle detailed packet type view +- **Enter**: View detailed statistics and timing analysis + +## Web GUI Streaming Architecture + +### Real-Time Data Pipeline +```python +class StreamLensWebAPI: + def __init__(self): + self.websocket_clients = set() + self.data_aggregator = DataAggregator() + + async def stream_flow_updates(self): + """Stream real-time flow statistics to connected clients""" + while True: + updates = self.data_aggregator.get_recent_updates() + if updates and self.websocket_clients: + await self.broadcast_updates(updates) + await asyncio.sleep(0.1) # 10Hz update rate + + async def broadcast_updates(self, updates: Dict): + """Send updates to all connected WebSocket clients""" + message = json.dumps({ + 'timestamp': time.time(), + 'flows': updates, + 'type': 'flow_update' + }) + disconnected = set() + for websocket in self.websocket_clients: + try: + await websocket.send(message) + except websockets.exceptions.ConnectionClosed: + disconnected.add(websocket) + self.websocket_clients -= disconnected +``` + +### Web Frontend Data Structure +```javascript +class StreamLensClient { + constructor() { + this.flows = new Map(); + this.websocket = null; + this.charts = new Map(); + } + + connect() { + this.websocket = new WebSocket('ws://localhost:8080/stream'); + this.websocket.onmessage = (event) => { + const data = JSON.parse(event.data); + this.handleFlowUpdate(data); + }; + } + + handleFlowUpdate(data) { + for (const [flowKey, flowData] of Object.entries(data.flows)) { + this.updateFlowDisplay(flowKey, flowData); + this.updateTimingCharts(flowKey, flowData.timing_stats); + } + } +} +``` + +## Implementation Phases + +### Phase 1: Core Redesign +1. **New Data Models**: Implement hierarchical flow structure +2. **Decoder Framework**: Create modular decoder registry +3. **Enhanced Analysis**: Multi-level statistics and timing analysis + +### Phase 2: Expanded Decoders +1. **RTP/RTCP Decoder**: Media streaming detection +2. **Industrial Protocol Decoders**: EtherCAT, PROFINET, Modbus TCP +3. **Broadcast Decoders**: SMPTE, AES67, NDI + +### Phase 3: Enhanced TUI +1. **Hierarchical Display**: Expandable tree view of flows +2. **Real-time Updates**: Live statistics during capture +3. **Advanced Filtering**: Protocol, encoding, and statistical filters + +### Phase 4: Web API & GUI +1. **WebSocket API**: Real-time data streaming +2. **React Frontend**: Interactive web interface +3. **Live Charts**: Real-time timing and throughput visualization +4. **Remote Monitoring**: Multiple StreamLens instances aggregation + +This architecture provides a solid foundation for your enhanced vision while maintaining modularity and extensibility. \ No newline at end of file