""" Enhanced Analysis Data Models This module defines data structures for enhanced protocol analysis including timing analysis, quality metrics, and decoded data representation. """ from dataclasses import dataclass, field from typing import Dict, List, Set, Any, Optional from enum import Enum class TimingQuality(Enum): """Timing quality classifications""" EXCELLENT = "excellent" # < 1ppm drift, stable GOOD = "good" # 1-10ppm drift, mostly stable MODERATE = "moderate" # 10-100ppm drift, variable POOR = "poor" # > 100ppm drift, unstable UNKNOWN = "unknown" class TimingStability(Enum): """Timing stability classifications""" STABLE = "stable" # Consistent timing behavior VARIABLE = "variable" # Some timing variations UNSTABLE = "unstable" # Highly variable timing UNKNOWN = "unknown" class DataType(Enum): """Primary data types in enhanced protocols""" ANALOG = "analog" PCM = "pcm" DISCRETE = "discrete" TIME = "time" VIDEO = "video" TMATS = "tmats" UNKNOWN = "unknown" @dataclass class TimingAnalysis: """Timing analysis results for enhanced protocols""" avg_clock_drift_ppm: float = 0.0 max_clock_drift_ppm: float = 0.0 min_clock_drift_ppm: float = 0.0 drift_variance: float = 0.0 quality: TimingQuality = TimingQuality.UNKNOWN stability: TimingStability = TimingStability.UNKNOWN # Timing accuracy metrics timing_accuracy_percent: float = 0.0 sync_errors: int = 0 timing_anomalies: int = 0 anomaly_rate_percent: float = 0.0 # Internal timing capabilities has_internal_timing: bool = False rtc_sync_available: bool = False def calculate_quality(self) -> TimingQuality: """Calculate timing quality based on drift measurements""" max_drift = abs(max(self.max_clock_drift_ppm, self.min_clock_drift_ppm, key=abs)) if max_drift < 1.0: return TimingQuality.EXCELLENT elif max_drift < 10.0: return TimingQuality.GOOD elif max_drift < 100.0: return TimingQuality.MODERATE else: return TimingQuality.POOR def calculate_stability(self) -> TimingStability: """Calculate timing stability based on variance""" if self.drift_variance < 1.0: return TimingStability.STABLE elif self.drift_variance < 25.0: return TimingStability.VARIABLE else: return TimingStability.UNSTABLE @dataclass class QualityMetrics: """Quality metrics for enhanced protocol data""" # Frame quality metrics avg_frame_quality_percent: float = 0.0 frame_quality_samples: List[float] = field(default_factory=list) # Signal quality metrics avg_signal_quality_percent: float = 0.0 signal_quality_samples: List[float] = field(default_factory=list) # Error counts sequence_gaps: int = 0 format_errors: int = 0 overflow_errors: int = 0 checksum_errors: int = 0 # Confidence metrics avg_confidence_score: float = 0.0 confidence_samples: List[float] = field(default_factory=list) low_confidence_frames: int = 0 # Data integrity corrupted_frames: int = 0 missing_frames: int = 0 duplicate_frames: int = 0 def calculate_overall_quality(self) -> float: """Calculate overall quality score (0-100)""" if not self.frame_quality_samples and not self.signal_quality_samples: return 0.0 frame_score = self.avg_frame_quality_percent if self.frame_quality_samples else 100.0 signal_score = self.avg_signal_quality_percent if self.signal_quality_samples else 100.0 confidence_score = self.avg_confidence_score * 100 if self.confidence_samples else 100.0 # Weight the scores weighted_score = (frame_score * 0.4 + signal_score * 0.4 + confidence_score * 0.2) # Apply error penalties total_frames = len(self.frame_quality_samples) or 1 error_rate = (self.format_errors + self.overflow_errors + self.corrupted_frames) / total_frames penalty = min(error_rate * 50, 50) # Max 50% penalty return max(0.0, weighted_score - penalty) @dataclass class DecodedData: """Container for decoded protocol data""" # Channel information channel_count: int = 0 analog_channels: int = 0 pcm_channels: int = 0 discrete_channels: int = 0 # Data type classification primary_data_type: DataType = DataType.UNKNOWN secondary_data_types: Set[DataType] = field(default_factory=set) # Decoded field information sample_decoded_fields: Dict[str, Any] = field(default_factory=dict) available_field_names: List[str] = field(default_factory=list) field_count: int = 0 critical_fields: List[str] = field(default_factory=list) # Frame type analysis frame_types: Set[str] = field(default_factory=set) frame_type_distribution: Dict[str, int] = field(default_factory=dict) # Special frame counts tmats_frames: int = 0 setup_frames: int = 0 data_frames: int = 0 # Decoder metadata decoder_type: str = "Standard" decoder_version: Optional[str] = None decode_success_rate: float = 1.0 def add_frame_type(self, frame_type: str): """Add a frame type to the analysis""" self.frame_types.add(frame_type) self.frame_type_distribution[frame_type] = self.frame_type_distribution.get(frame_type, 0) + 1 def get_dominant_frame_type(self) -> Optional[str]: """Get the most common frame type""" if not self.frame_type_distribution: return None return max(self.frame_type_distribution.items(), key=lambda x: x[1])[0] def update_data_type_classification(self): """Update primary data type based on channel counts""" if self.analog_channels > 0 and self.analog_channels >= self.pcm_channels: self.primary_data_type = DataType.ANALOG elif self.pcm_channels > 0: self.primary_data_type = DataType.PCM elif self.discrete_channels > 0: self.primary_data_type = DataType.DISCRETE elif self.tmats_frames > 0: self.primary_data_type = DataType.TMATS # Add secondary types if self.analog_channels > 0: self.secondary_data_types.add(DataType.ANALOG) if self.pcm_channels > 0: self.secondary_data_types.add(DataType.PCM) if self.discrete_channels > 0: self.secondary_data_types.add(DataType.DISCRETE) if self.tmats_frames > 0: self.secondary_data_types.add(DataType.TMATS) @dataclass class EnhancedAnalysisData: """Complete enhanced analysis data combining all analysis types""" timing: TimingAnalysis = field(default_factory=TimingAnalysis) quality: QualityMetrics = field(default_factory=QualityMetrics) decoded: DecodedData = field(default_factory=DecodedData) # Legacy compatibility fields (will be deprecated) avg_clock_drift_ppm: float = field(init=False) max_clock_drift_ppm: float = field(init=False) timing_quality: str = field(init=False) timing_stability: str = field(init=False) anomaly_rate: float = field(init=False) avg_confidence_score: float = field(init=False) avg_frame_quality: float = field(init=False) sequence_gaps: int = field(init=False) rtc_sync_errors: int = field(init=False) format_errors: int = field(init=False) overflow_errors: int = field(init=False) channel_count: int = field(init=False) analog_channels: int = field(init=False) pcm_channels: int = field(init=False) tmats_frames: int = field(init=False) has_internal_timing: bool = field(init=False) primary_data_type: str = field(init=False) decoder_type: str = field(init=False) sample_decoded_fields: Dict[str, Any] = field(init=False) available_field_names: List[str] = field(init=False) field_count: int = field(init=False) frame_types: Set[str] = field(init=False) timing_accuracy: float = field(init=False) signal_quality: float = field(init=False) def __post_init__(self): """Initialize legacy compatibility properties""" self._update_legacy_fields() def _update_legacy_fields(self): """Update legacy fields from new structured data""" # Timing fields self.avg_clock_drift_ppm = self.timing.avg_clock_drift_ppm self.max_clock_drift_ppm = self.timing.max_clock_drift_ppm self.timing_quality = self.timing.quality.value self.timing_stability = self.timing.stability.value self.anomaly_rate = self.timing.anomaly_rate_percent self.has_internal_timing = self.timing.has_internal_timing self.timing_accuracy = self.timing.timing_accuracy_percent # Quality fields self.avg_confidence_score = self.quality.avg_confidence_score self.avg_frame_quality = self.quality.avg_frame_quality_percent self.sequence_gaps = self.quality.sequence_gaps self.rtc_sync_errors = self.timing.sync_errors self.format_errors = self.quality.format_errors self.overflow_errors = self.quality.overflow_errors self.signal_quality = self.quality.avg_signal_quality_percent # Decoded data fields self.channel_count = self.decoded.channel_count self.analog_channels = self.decoded.analog_channels self.pcm_channels = self.decoded.pcm_channels self.tmats_frames = self.decoded.tmats_frames self.primary_data_type = self.decoded.primary_data_type.value self.decoder_type = self.decoded.decoder_type self.sample_decoded_fields = self.decoded.sample_decoded_fields self.available_field_names = self.decoded.available_field_names self.field_count = self.decoded.field_count self.frame_types = self.decoded.frame_types def update_from_components(self): """Update legacy fields when component objects change""" self._update_legacy_fields() def get_overall_health_score(self) -> float: """Calculate overall health score for the enhanced analysis""" quality_score = self.quality.calculate_overall_quality() # Factor in timing quality timing_score = 100.0 if self.timing.quality == TimingQuality.EXCELLENT: timing_score = 100.0 elif self.timing.quality == TimingQuality.GOOD: timing_score = 80.0 elif self.timing.quality == TimingQuality.MODERATE: timing_score = 60.0 elif self.timing.quality == TimingQuality.POOR: timing_score = 30.0 else: timing_score = 50.0 # Unknown # Weight quality more heavily than timing return (quality_score * 0.7 + timing_score * 0.3)