Files
StreamLens/architecture_redesign.md

7.6 KiB

StreamLens Architecture Redesign

Core Data Model Hierarchy

1. Flow Level

@dataclass
class Flow:
    src_ip: str
    dst_ip: str
    src_port: Optional[int] = None
    dst_port: Optional[int] = None
    transport_protocols: Dict[str, TransportProtocol] = field(default_factory=dict)
    traffic_type: str = "Unknown"  # Unicast/Multicast/Broadcast
    first_seen: float = 0.0
    last_seen: float = 0.0
    total_packets: int = 0
    total_bytes: int = 0

2. Transport Protocol Level

@dataclass 
class TransportProtocol:
    protocol_name: str  # TCP, UDP, ICMP, IGMP
    port_info: Dict[str, int]  # src_port, dst_port
    application_encodings: Dict[str, ApplicationEncoding] = field(default_factory=dict)
    packet_count: int = 0
    byte_count: int = 0
    timing_stats: TimingStatistics = field(default_factory=TimingStatistics)

3. Application Encoding Level

@dataclass
class ApplicationEncoding:
    encoding_name: str  # PTP, IENA, Chapter10, RTP, etc.
    packet_types: Dict[str, PacketTypeStats] = field(default_factory=dict)
    decoder_metadata: Dict[str, Any] = field(default_factory=dict)
    packet_count: int = 0
    byte_count: int = 0
    timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
    confidence_score: float = 1.0  # How sure we are about this encoding

4. Packet Type Level

@dataclass
class PacketTypeStats:
    type_name: str  # Sync, Follow_Up, Delay_Req, TMATS, PCM_Data, etc.
    packet_count: int = 0
    byte_count: int = 0
    timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
    payload_characteristics: Dict[str, Any] = field(default_factory=dict)

Modular Decoder Framework

Base Decoder Interface

class StreamDecoder(ABC):
    @property
    @abstractmethod
    def encoding_name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def detection_ports(self) -> List[int]:
        pass
    
    @abstractmethod
    def can_decode(self, packet: Packet, transport_info: Dict) -> float:
        """Return confidence score 0.0-1.0 that this decoder can handle the packet"""
        pass
    
    @abstractmethod
    def decode_packet(self, packet: Packet) -> DecodingResult:
        """Decode packet and return structured result"""
        pass
    
    @abstractmethod
    def get_packet_type(self, decoded_data: Dict) -> str:
        """Return specific packet type within this encoding"""
        pass

Decoder Registry

class DecoderRegistry:
    def __init__(self):
        self.decoders = []
        self._register_default_decoders()
    
    def register_decoder(self, decoder: StreamDecoder):
        self.decoders.append(decoder)
    
    def find_decoders(self, packet: Packet, transport_info: Dict) -> List[Tuple[StreamDecoder, float]]:
        """Return list of (decoder, confidence) tuples sorted by confidence"""
        candidates = []
        for decoder in self.decoders:
            confidence = decoder.can_decode(packet, transport_info)
            if confidence > 0.0:
                candidates.append((decoder, confidence))
        return sorted(candidates, key=lambda x: x[1], reverse=True)

Enhanced TUI Layout

Main Flow Table

┌─ FLOWS ────────────────────────────────────────────────────────────────┐
│ Src:Port        │ Dst:Port        │ Proto │ Cast │ #Frames │ Bytes │ ΔT  │
├─────────────────┼─────────────────┼───────┼──────┼─────────┼───────┼─────┤
│ 192.168.1.10:0  │ 239.255.0.1:319 │ UDP   │ Multi│   1,234 │  1.2M │ 1ms │
│ ├─ PTP (v2)     │                 │       │      │     856 │  856K │ 1ms │
│ │  ├─ Sync      │                 │       │      │     428 │  428K │ 2ms │
│ │  └─ Follow_Up │                 │       │      │     428 │  428K │ 2ms │
│ └─ IENA         │                 │       │      │     378 │  378K │ 3ms │
│    ├─ P-type    │                 │       │      │     200 │  200K │ 5ms │
│    └─ D-type    │                 │       │      │     178 │  178K │ 5ms │
└─────────────────┴─────────────────┴───────┴──────┴─────────┴───────┴─────┘

Navigation

  • ↑↓: Navigate flows and subflows
  • : Expand flow to show encodings
  • : Collapse flow
  • Space: Toggle detailed packet type view
  • Enter: View detailed statistics and timing analysis

Web GUI Streaming Architecture

Real-Time Data Pipeline

class StreamLensWebAPI:
    def __init__(self):
        self.websocket_clients = set()
        self.data_aggregator = DataAggregator()
        
    async def stream_flow_updates(self):
        """Stream real-time flow statistics to connected clients"""
        while True:
            updates = self.data_aggregator.get_recent_updates()
            if updates and self.websocket_clients:
                await self.broadcast_updates(updates)
            await asyncio.sleep(0.1)  # 10Hz update rate
    
    async def broadcast_updates(self, updates: Dict):
        """Send updates to all connected WebSocket clients"""
        message = json.dumps({
            'timestamp': time.time(),
            'flows': updates,
            'type': 'flow_update'
        })
        disconnected = set()
        for websocket in self.websocket_clients:
            try:
                await websocket.send(message)
            except websockets.exceptions.ConnectionClosed:
                disconnected.add(websocket)
        self.websocket_clients -= disconnected

Web Frontend Data Structure

class StreamLensClient {
    constructor() {
        this.flows = new Map();
        this.websocket = null;
        this.charts = new Map();
    }
    
    connect() {
        this.websocket = new WebSocket('ws://localhost:8080/stream');
        this.websocket.onmessage = (event) => {
            const data = JSON.parse(event.data);
            this.handleFlowUpdate(data);
        };
    }
    
    handleFlowUpdate(data) {
        for (const [flowKey, flowData] of Object.entries(data.flows)) {
            this.updateFlowDisplay(flowKey, flowData);
            this.updateTimingCharts(flowKey, flowData.timing_stats);
        }
    }
}

Implementation Phases

Phase 1: Core Redesign

  1. New Data Models: Implement hierarchical flow structure
  2. Decoder Framework: Create modular decoder registry
  3. Enhanced Analysis: Multi-level statistics and timing analysis

Phase 2: Expanded Decoders

  1. RTP/RTCP Decoder: Media streaming detection
  2. Industrial Protocol Decoders: EtherCAT, PROFINET, Modbus TCP
  3. Broadcast Decoders: SMPTE, AES67, NDI

Phase 3: Enhanced TUI

  1. Hierarchical Display: Expandable tree view of flows
  2. Real-time Updates: Live statistics during capture
  3. Advanced Filtering: Protocol, encoding, and statistical filters

Phase 4: Web API & GUI

  1. WebSocket API: Real-time data streaming
  2. React Frontend: Interactive web interface
  3. Live Charts: Real-time timing and throughput visualization
  4. Remote Monitoring: Multiple StreamLens instances aggregation

This architecture provides a solid foundation for your enhanced vision while maintaining modularity and extensibility.