214 lines
7.6 KiB
Markdown
214 lines
7.6 KiB
Markdown
# StreamLens Architecture Redesign
|
|
|
|
## Core Data Model Hierarchy
|
|
|
|
### 1. Flow Level
|
|
```python
|
|
@dataclass
|
|
class Flow:
|
|
src_ip: str
|
|
dst_ip: str
|
|
src_port: Optional[int] = None
|
|
dst_port: Optional[int] = None
|
|
transport_protocols: Dict[str, TransportProtocol] = field(default_factory=dict)
|
|
traffic_type: str = "Unknown" # Unicast/Multicast/Broadcast
|
|
first_seen: float = 0.0
|
|
last_seen: float = 0.0
|
|
total_packets: int = 0
|
|
total_bytes: int = 0
|
|
```
|
|
|
|
### 2. Transport Protocol Level
|
|
```python
|
|
@dataclass
|
|
class TransportProtocol:
|
|
protocol_name: str # TCP, UDP, ICMP, IGMP
|
|
port_info: Dict[str, int] # src_port, dst_port
|
|
application_encodings: Dict[str, ApplicationEncoding] = field(default_factory=dict)
|
|
packet_count: int = 0
|
|
byte_count: int = 0
|
|
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
|
|
```
|
|
|
|
### 3. Application Encoding Level
|
|
```python
|
|
@dataclass
|
|
class ApplicationEncoding:
|
|
encoding_name: str # PTP, IENA, Chapter10, RTP, etc.
|
|
packet_types: Dict[str, PacketTypeStats] = field(default_factory=dict)
|
|
decoder_metadata: Dict[str, Any] = field(default_factory=dict)
|
|
packet_count: int = 0
|
|
byte_count: int = 0
|
|
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
|
|
confidence_score: float = 1.0 # How sure we are about this encoding
|
|
```
|
|
|
|
### 4. Packet Type Level
|
|
```python
|
|
@dataclass
|
|
class PacketTypeStats:
|
|
type_name: str # Sync, Follow_Up, Delay_Req, TMATS, PCM_Data, etc.
|
|
packet_count: int = 0
|
|
byte_count: int = 0
|
|
timing_stats: TimingStatistics = field(default_factory=TimingStatistics)
|
|
payload_characteristics: Dict[str, Any] = field(default_factory=dict)
|
|
```
|
|
|
|
## Modular Decoder Framework
|
|
|
|
### Base Decoder Interface
|
|
```python
|
|
class StreamDecoder(ABC):
|
|
@property
|
|
@abstractmethod
|
|
def encoding_name(self) -> str:
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def detection_ports(self) -> List[int]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def can_decode(self, packet: Packet, transport_info: Dict) -> float:
|
|
"""Return confidence score 0.0-1.0 that this decoder can handle the packet"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def decode_packet(self, packet: Packet) -> DecodingResult:
|
|
"""Decode packet and return structured result"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_packet_type(self, decoded_data: Dict) -> str:
|
|
"""Return specific packet type within this encoding"""
|
|
pass
|
|
```
|
|
|
|
### Decoder Registry
|
|
```python
|
|
class DecoderRegistry:
|
|
def __init__(self):
|
|
self.decoders = []
|
|
self._register_default_decoders()
|
|
|
|
def register_decoder(self, decoder: StreamDecoder):
|
|
self.decoders.append(decoder)
|
|
|
|
def find_decoders(self, packet: Packet, transport_info: Dict) -> List[Tuple[StreamDecoder, float]]:
|
|
"""Return list of (decoder, confidence) tuples sorted by confidence"""
|
|
candidates = []
|
|
for decoder in self.decoders:
|
|
confidence = decoder.can_decode(packet, transport_info)
|
|
if confidence > 0.0:
|
|
candidates.append((decoder, confidence))
|
|
return sorted(candidates, key=lambda x: x[1], reverse=True)
|
|
```
|
|
|
|
## Enhanced TUI Layout
|
|
|
|
### Main Flow Table
|
|
```
|
|
┌─ FLOWS ────────────────────────────────────────────────────────────────┐
|
|
│ Src:Port │ Dst:Port │ Proto │ Cast │ #Frames │ Bytes │ ΔT │
|
|
├─────────────────┼─────────────────┼───────┼──────┼─────────┼───────┼─────┤
|
|
│ 192.168.1.10:0 │ 239.255.0.1:319 │ UDP │ Multi│ 1,234 │ 1.2M │ 1ms │
|
|
│ ├─ PTP (v2) │ │ │ │ 856 │ 856K │ 1ms │
|
|
│ │ ├─ Sync │ │ │ │ 428 │ 428K │ 2ms │
|
|
│ │ └─ Follow_Up │ │ │ │ 428 │ 428K │ 2ms │
|
|
│ └─ IENA │ │ │ │ 378 │ 378K │ 3ms │
|
|
│ ├─ P-type │ │ │ │ 200 │ 200K │ 5ms │
|
|
│ └─ D-type │ │ │ │ 178 │ 178K │ 5ms │
|
|
└─────────────────┴─────────────────┴───────┴──────┴─────────┴───────┴─────┘
|
|
```
|
|
|
|
### Navigation
|
|
- **↑↓**: Navigate flows and subflows
|
|
- **→**: Expand flow to show encodings
|
|
- **←**: Collapse flow
|
|
- **Space**: Toggle detailed packet type view
|
|
- **Enter**: View detailed statistics and timing analysis
|
|
|
|
## Web GUI Streaming Architecture
|
|
|
|
### Real-Time Data Pipeline
|
|
```python
|
|
class StreamLensWebAPI:
|
|
def __init__(self):
|
|
self.websocket_clients = set()
|
|
self.data_aggregator = DataAggregator()
|
|
|
|
async def stream_flow_updates(self):
|
|
"""Stream real-time flow statistics to connected clients"""
|
|
while True:
|
|
updates = self.data_aggregator.get_recent_updates()
|
|
if updates and self.websocket_clients:
|
|
await self.broadcast_updates(updates)
|
|
await asyncio.sleep(0.1) # 10Hz update rate
|
|
|
|
async def broadcast_updates(self, updates: Dict):
|
|
"""Send updates to all connected WebSocket clients"""
|
|
message = json.dumps({
|
|
'timestamp': time.time(),
|
|
'flows': updates,
|
|
'type': 'flow_update'
|
|
})
|
|
disconnected = set()
|
|
for websocket in self.websocket_clients:
|
|
try:
|
|
await websocket.send(message)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected.add(websocket)
|
|
self.websocket_clients -= disconnected
|
|
```
|
|
|
|
### Web Frontend Data Structure
|
|
```javascript
|
|
class StreamLensClient {
|
|
constructor() {
|
|
this.flows = new Map();
|
|
this.websocket = null;
|
|
this.charts = new Map();
|
|
}
|
|
|
|
connect() {
|
|
this.websocket = new WebSocket('ws://localhost:8080/stream');
|
|
this.websocket.onmessage = (event) => {
|
|
const data = JSON.parse(event.data);
|
|
this.handleFlowUpdate(data);
|
|
};
|
|
}
|
|
|
|
handleFlowUpdate(data) {
|
|
for (const [flowKey, flowData] of Object.entries(data.flows)) {
|
|
this.updateFlowDisplay(flowKey, flowData);
|
|
this.updateTimingCharts(flowKey, flowData.timing_stats);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Implementation Phases
|
|
|
|
### Phase 1: Core Redesign
|
|
1. **New Data Models**: Implement hierarchical flow structure
|
|
2. **Decoder Framework**: Create modular decoder registry
|
|
3. **Enhanced Analysis**: Multi-level statistics and timing analysis
|
|
|
|
### Phase 2: Expanded Decoders
|
|
1. **RTP/RTCP Decoder**: Media streaming detection
|
|
2. **Industrial Protocol Decoders**: EtherCAT, PROFINET, Modbus TCP
|
|
3. **Broadcast Decoders**: SMPTE, AES67, NDI
|
|
|
|
### Phase 3: Enhanced TUI
|
|
1. **Hierarchical Display**: Expandable tree view of flows
|
|
2. **Real-time Updates**: Live statistics during capture
|
|
3. **Advanced Filtering**: Protocol, encoding, and statistical filters
|
|
|
|
### Phase 4: Web API & GUI
|
|
1. **WebSocket API**: Real-time data streaming
|
|
2. **React Frontend**: Interactive web interface
|
|
3. **Live Charts**: Real-time timing and throughput visualization
|
|
4. **Remote Monitoring**: Multiple StreamLens instances aggregation
|
|
|
|
This architecture provides a solid foundation for your enhanced vision while maintaining modularity and extensibility. |