Files
StreamLens/analyzer/analysis/statistics.py

291 lines
12 KiB
Python

"""
Statistical analysis engine for timing and outlier detection
"""
import statistics
from typing import Dict, List, Tuple
from ..models import FlowStats, FrameTypeStats
class StatisticsEngine:
"""Handles statistical calculations and outlier detection"""
def __init__(self, outlier_threshold_sigma: float = 3.0, enable_realtime: bool = False):
"""
Initialize statistics engine
Args:
outlier_threshold_sigma: Number of standard deviations for outlier detection
enable_realtime: Enable real-time running statistics calculation
"""
self.outlier_threshold_sigma = outlier_threshold_sigma
self.enable_realtime = enable_realtime
self.realtime_stats = {} # Cache for running statistics
def calculate_flow_statistics(self, flows: Dict[tuple, FlowStats]) -> None:
"""Calculate timing statistics and detect outliers for all flows"""
for flow in flows.values():
self._calculate_single_flow_statistics(flow)
def calculate_all_statistics(self, analyzer=None) -> None:
"""Calculate statistics for all flows (called by background analyzer)"""
# This is called by the background analyzer
# The analyzer parameter should be passed in
if analyzer and hasattr(analyzer, 'flows'):
self.calculate_flow_statistics(analyzer.flows)
def _calculate_single_flow_statistics(self, flow: FlowStats) -> None:
"""Calculate statistics for a single flow"""
# Ensure timeline statistics are calculated
if len(flow.timestamps) >= 2:
flow.duration = flow.timestamps[-1] - flow.timestamps[0]
flow.first_seen = flow.timestamps[0]
flow.last_seen = flow.timestamps[-1]
if len(flow.inter_arrival_times) < 2:
return
# Calculate average and std deviation for overall flow
flow.avg_inter_arrival = statistics.mean(flow.inter_arrival_times)
flow.std_inter_arrival = statistics.stdev(flow.inter_arrival_times)
# Calculate jitter as coefficient of variation (normalized standard deviation)
if flow.avg_inter_arrival > 0:
flow.jitter = flow.std_inter_arrival / flow.avg_inter_arrival
else:
flow.jitter = 0.0
# Detect outliers (frames with inter-arrival time > threshold * std deviations from mean)
threshold = flow.avg_inter_arrival + (self.outlier_threshold_sigma * flow.std_inter_arrival)
# Clear existing outliers to recalculate
flow.outlier_frames.clear()
flow.outlier_details.clear()
for i, inter_time in enumerate(flow.inter_arrival_times):
if inter_time > threshold:
# Frame number is i+2 because inter_arrival_times[i] is between frame i+1 and i+2
frame_number = flow.frame_numbers[i + 1]
flow.outlier_frames.append(frame_number)
flow.outlier_details.append((frame_number, inter_time))
# Calculate statistics for each frame type
for frame_type, ft_stats in flow.frame_types.items():
self._calculate_frame_type_statistics(ft_stats)
def _calculate_frame_type_statistics(self, ft_stats: FrameTypeStats) -> None:
"""Calculate statistics for a specific frame type"""
if len(ft_stats.inter_arrival_times) < 2:
return
ft_stats.avg_inter_arrival = statistics.mean(ft_stats.inter_arrival_times)
ft_stats.std_inter_arrival = statistics.stdev(ft_stats.inter_arrival_times)
# Detect outliers for this frame type
ft_threshold = ft_stats.avg_inter_arrival + (self.outlier_threshold_sigma * ft_stats.std_inter_arrival)
# Clear existing outliers to recalculate
ft_stats.outlier_frames.clear()
ft_stats.outlier_details.clear()
ft_stats.enhanced_outlier_details.clear()
for i, inter_time in enumerate(ft_stats.inter_arrival_times):
if inter_time > ft_threshold:
frame_number = ft_stats.frame_numbers[i + 1] # Current frame
prev_frame_number = ft_stats.frame_numbers[i] # Previous frame
ft_stats.outlier_frames.append(frame_number)
ft_stats.outlier_details.append((frame_number, inter_time)) # Legacy format
ft_stats.enhanced_outlier_details.append((frame_number, prev_frame_number, inter_time)) # Enhanced format
def get_flow_summary_statistics(self, flows: Dict[tuple, FlowStats]) -> Dict[str, float]:
"""Get summary statistics across all flows"""
all_inter_arrivals = []
total_packets = 0
total_outliers = 0
for flow in flows.values():
all_inter_arrivals.extend(flow.inter_arrival_times)
total_packets += flow.frame_count
total_outliers += len(flow.outlier_frames)
if not all_inter_arrivals:
return {}
return {
'overall_avg_inter_arrival': statistics.mean(all_inter_arrivals),
'overall_std_inter_arrival': statistics.stdev(all_inter_arrivals) if len(all_inter_arrivals) > 1 else 0,
'total_packets': total_packets,
'total_outliers': total_outliers,
'outlier_percentage': (total_outliers / total_packets * 100) if total_packets > 0 else 0
}
def identify_high_jitter_flows(self, flows: Dict[tuple, FlowStats],
jitter_threshold: float = 0.1) -> List[FlowStats]:
"""Identify flows with high timing jitter"""
high_jitter_flows = []
for flow in flows.values():
if flow.avg_inter_arrival > 0:
# Calculate coefficient of variation (CV) as a measure of jitter
cv = flow.std_inter_arrival / flow.avg_inter_arrival
if cv > jitter_threshold:
high_jitter_flows.append(flow)
# Sort by coefficient of variation (highest first)
high_jitter_flows.sort(key=lambda f: f.std_inter_arrival / f.avg_inter_arrival
if f.avg_inter_arrival > 0 else 0, reverse=True)
return high_jitter_flows
def calculate_inter_arrival_percentiles(self, flow: FlowStats) -> Dict[str, float]:
"""Calculate percentiles for inter-arrival times"""
if not flow.inter_arrival_times:
return {}
times = sorted(flow.inter_arrival_times)
n = len(times)
def percentile(p: float) -> float:
k = (n - 1) * p / 100
f = int(k)
c = k - f
if f == n - 1:
return times[f]
return times[f] * (1 - c) + times[f + 1] * c
return {
'p50': percentile(50), # Median
'p90': percentile(90),
'p95': percentile(95),
'p99': percentile(99),
'min': min(times),
'max': max(times)
}
def update_realtime_statistics(self, flow_key: tuple, flow: FlowStats) -> None:
"""Update real-time running statistics for a flow"""
if not self.enable_realtime or len(flow.inter_arrival_times) < 2:
return
# Initialize if first time
if flow_key not in self.realtime_stats:
self.realtime_stats[flow_key] = {
'count': 0,
'sum': 0.0,
'sum_squares': 0.0,
'outlier_count': 0,
'last_avg': 0.0,
'last_std': 0.0
}
stats = self.realtime_stats[flow_key]
# Use most recent inter-arrival time
new_time = flow.inter_arrival_times[-1]
stats['count'] += 1
stats['sum'] += new_time
stats['sum_squares'] += new_time * new_time
# Calculate running average and standard deviation
if stats['count'] >= 2:
avg = stats['sum'] / stats['count']
variance = (stats['sum_squares'] / stats['count']) - (avg * avg)
std = variance ** 0.5 if variance > 0 else 0.0
# Update flow statistics with running values
flow.avg_inter_arrival = avg
flow.std_inter_arrival = std
# Check for outliers in real-time
threshold = avg + (self.outlier_threshold_sigma * std)
if new_time > threshold:
frame_number = flow.frame_numbers[-1]
if frame_number not in flow.outlier_frames:
flow.outlier_frames.append(frame_number)
flow.outlier_details.append((frame_number, new_time))
stats['outlier_count'] += 1
stats['last_avg'] = avg
stats['last_std'] = std
# Update frame type statistics
for frame_type, ft_stats in flow.frame_types.items():
self._update_realtime_frame_type_stats(flow_key, frame_type, ft_stats)
def _update_realtime_frame_type_stats(self, flow_key: tuple, frame_type: str, ft_stats: FrameTypeStats) -> None:
"""Update real-time statistics for frame types"""
if len(ft_stats.inter_arrival_times) < 2:
return
ft_key = (flow_key, frame_type)
if ft_key not in self.realtime_stats:
self.realtime_stats[ft_key] = {
'count': 0,
'sum': 0.0,
'sum_squares': 0.0,
'outlier_count': 0,
'last_avg': 0.0,
'last_std': 0.0
}
stats = self.realtime_stats[ft_key]
new_time = ft_stats.inter_arrival_times[-1]
stats['count'] += 1
stats['sum'] += new_time
stats['sum_squares'] += new_time * new_time
if stats['count'] >= 2:
avg = stats['sum'] / stats['count']
variance = (stats['sum_squares'] / stats['count']) - (avg * avg)
std = variance ** 0.5 if variance > 0 else 0.0
ft_stats.avg_inter_arrival = avg
ft_stats.std_inter_arrival = std
# Check for frame type outliers
threshold = avg + (self.outlier_threshold_sigma * std)
if new_time > threshold:
frame_number = ft_stats.frame_numbers[-1]
prev_frame_number = ft_stats.frame_numbers[-2] if len(ft_stats.frame_numbers) > 1 else 0
if frame_number not in ft_stats.outlier_frames:
ft_stats.outlier_frames.append(frame_number)
ft_stats.outlier_details.append((frame_number, new_time)) # Legacy format
ft_stats.enhanced_outlier_details.append((frame_number, prev_frame_number, new_time)) # Enhanced format
stats['outlier_count'] += 1
stats['last_avg'] = avg
stats['last_std'] = std
def get_realtime_summary(self) -> Dict[str, any]:
"""Get summary of real-time statistics"""
if not self.enable_realtime:
return {}
total_flows = len([k for k in self.realtime_stats.keys() if isinstance(k, tuple) and len(k) == 2])
total_outliers = sum(stats['outlier_count'] for stats in self.realtime_stats.values())
return {
'realtime_enabled': True,
'tracked_flows': total_flows,
'total_outliers': total_outliers,
'update_frequency': 'per_packet'
}
def get_max_sigma_deviation(self, flow: FlowStats) -> float:
"""Get the maximum sigma deviation for any outlier in this flow"""
max_sigma = 0.0
# Check flow-level outliers
if flow.outlier_details and flow.std_inter_arrival > 0:
for frame_num, inter_arrival_time in flow.outlier_details:
sigma_deviation = (inter_arrival_time - flow.avg_inter_arrival) / flow.std_inter_arrival
max_sigma = max(max_sigma, sigma_deviation)
# Check frame-type-level outliers
for frame_type, ft_stats in flow.frame_types.items():
if ft_stats.outlier_details and ft_stats.std_inter_arrival > 0:
for frame_num, inter_arrival_time in ft_stats.outlier_details:
sigma_deviation = (inter_arrival_time - ft_stats.avg_inter_arrival) / ft_stats.std_inter_arrival
max_sigma = max(max_sigma, sigma_deviation)
return max_sigma