7.6 KiB
7.6 KiB
StreamLens Architecture Redesign
Core Data Model Hierarchy
1. Flow Level
@dataclass
class Flow:
src_ip: str
dst_ip: str
src_port: Optional[int] = None
dst_port: Optional[int] = None
transport_protocols: Dict[str, TransportProtocol] = field(default_factory=dict)
traffic_type: str = "Unknown" # Unicast/Multicast/Broadcast
first_seen: float = 0.0
last_seen: float = 0.0
total_packets: int = 0
total_bytes: int = 0
2. Transport Protocol Level
@dataclass
class TransportProtocol:
protocol_name: str # TCP, UDP, ICMP, IGMP
port_info: Dict[str, int] # src_port, dst_port
application_encodings: Dict[str, ApplicationEncoding] = field(default_factory=dict)
packet_count: int = 0
byte_count: int = 0
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
3. Application Encoding Level
@dataclass
class ApplicationEncoding:
encoding_name: str # PTP, IENA, Chapter10, RTP, etc.
packet_types: Dict[str, PacketTypeStats] = field(default_factory=dict)
decoder_metadata: Dict[str, Any] = field(default_factory=dict)
packet_count: int = 0
byte_count: int = 0
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
confidence_score: float = 1.0 # How sure we are about this encoding
4. Packet Type Level
@dataclass
class PacketTypeStats:
type_name: str # Sync, Follow_Up, Delay_Req, TMATS, PCM_Data, etc.
packet_count: int = 0
byte_count: int = 0
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
payload_characteristics: Dict[str, Any] = field(default_factory=dict)
Modular Decoder Framework
Base Decoder Interface
class StreamDecoder(ABC):
@property
@abstractmethod
def encoding_name(self) -> str:
pass
@property
@abstractmethod
def detection_ports(self) -> List[int]:
pass
@abstractmethod
def can_decode(self, packet: Packet, transport_info: Dict) -> float:
"""Return confidence score 0.0-1.0 that this decoder can handle the packet"""
pass
@abstractmethod
def decode_packet(self, packet: Packet) -> DecodingResult:
"""Decode packet and return structured result"""
pass
@abstractmethod
def get_packet_type(self, decoded_data: Dict) -> str:
"""Return specific packet type within this encoding"""
pass
Decoder Registry
class DecoderRegistry:
def __init__(self):
self.decoders = []
self._register_default_decoders()
def register_decoder(self, decoder: StreamDecoder):
self.decoders.append(decoder)
def find_decoders(self, packet: Packet, transport_info: Dict) -> List[Tuple[StreamDecoder, float]]:
"""Return list of (decoder, confidence) tuples sorted by confidence"""
candidates = []
for decoder in self.decoders:
confidence = decoder.can_decode(packet, transport_info)
if confidence > 0.0:
candidates.append((decoder, confidence))
return sorted(candidates, key=lambda x: x[1], reverse=True)
Enhanced TUI Layout
Main Flow Table
┌─ FLOWS ────────────────────────────────────────────────────────────────┐
│ Src:Port │ Dst:Port │ Proto │ Cast │ #Frames │ Bytes │ ΔT │
├─────────────────┼─────────────────┼───────┼──────┼─────────┼───────┼─────┤
│ 192.168.1.10:0 │ 239.255.0.1:319 │ UDP │ Multi│ 1,234 │ 1.2M │ 1ms │
│ ├─ PTP (v2) │ │ │ │ 856 │ 856K │ 1ms │
│ │ ├─ Sync │ │ │ │ 428 │ 428K │ 2ms │
│ │ └─ Follow_Up │ │ │ │ 428 │ 428K │ 2ms │
│ └─ IENA │ │ │ │ 378 │ 378K │ 3ms │
│ ├─ P-type │ │ │ │ 200 │ 200K │ 5ms │
│ └─ D-type │ │ │ │ 178 │ 178K │ 5ms │
└─────────────────┴─────────────────┴───────┴──────┴─────────┴───────┴─────┘
Navigation
- ↑↓: Navigate flows and subflows
- →: Expand flow to show encodings
- ←: Collapse flow
- Space: Toggle detailed packet type view
- Enter: View detailed statistics and timing analysis
Web GUI Streaming Architecture
Real-Time Data Pipeline
class StreamLensWebAPI:
def __init__(self):
self.websocket_clients = set()
self.data_aggregator = DataAggregator()
async def stream_flow_updates(self):
"""Stream real-time flow statistics to connected clients"""
while True:
updates = self.data_aggregator.get_recent_updates()
if updates and self.websocket_clients:
await self.broadcast_updates(updates)
await asyncio.sleep(0.1) # 10Hz update rate
async def broadcast_updates(self, updates: Dict):
"""Send updates to all connected WebSocket clients"""
message = json.dumps({
'timestamp': time.time(),
'flows': updates,
'type': 'flow_update'
})
disconnected = set()
for websocket in self.websocket_clients:
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(websocket)
self.websocket_clients -= disconnected
Web Frontend Data Structure
class StreamLensClient {
constructor() {
this.flows = new Map();
this.websocket = null;
this.charts = new Map();
}
connect() {
this.websocket = new WebSocket('ws://localhost:8080/stream');
this.websocket.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleFlowUpdate(data);
};
}
handleFlowUpdate(data) {
for (const [flowKey, flowData] of Object.entries(data.flows)) {
this.updateFlowDisplay(flowKey, flowData);
this.updateTimingCharts(flowKey, flowData.timing_stats);
}
}
}
Implementation Phases
Phase 1: Core Redesign
- New Data Models: Implement hierarchical flow structure
- Decoder Framework: Create modular decoder registry
- Enhanced Analysis: Multi-level statistics and timing analysis
Phase 2: Expanded Decoders
- RTP/RTCP Decoder: Media streaming detection
- Industrial Protocol Decoders: EtherCAT, PROFINET, Modbus TCP
- Broadcast Decoders: SMPTE, AES67, NDI
Phase 3: Enhanced TUI
- Hierarchical Display: Expandable tree view of flows
- Real-time Updates: Live statistics during capture
- Advanced Filtering: Protocol, encoding, and statistical filters
Phase 4: Web API & GUI
- WebSocket API: Real-time data streaming
- React Frontend: Interactive web interface
- Live Charts: Real-time timing and throughput visualization
- Remote Monitoring: Multiple StreamLens instances aggregation
This architecture provides a solid foundation for your enhanced vision while maintaining modularity and extensibility.