#!/usr/bin/env python3 """ Textual State Visualizer - Real-time state monitoring for Textual apps """ import json import time import threading from typing import Dict, Any, List, Optional from pathlib import Path import webbrowser import http.server import socketserver from urllib.parse import urlparse, parse_qs class TextualStateMonitor: """Monitor and capture Textual app state changes""" def __init__(self, app_instance): self.app = app_instance self.state_history = [] self.monitoring = False self.monitor_thread = None def capture_state(self) -> Dict[str, Any]: """Capture current app state""" state = { 'timestamp': time.time(), 'focused_widget': None, 'widgets': {}, 'screen_info': {}, 'reactive_values': {} } # Capture focused widget if hasattr(self.app, 'focused') and self.app.focused: focused = self.app.focused state['focused_widget'] = { 'type': focused.__class__.__name__, 'id': getattr(focused, 'id', None), 'classes': list(getattr(focused, 'classes', [])) } # Capture screen info if hasattr(self.app, 'screen'): screen = self.app.screen state['screen_info'] = { 'type': screen.__class__.__name__, 'id': getattr(screen, 'id', None), 'size': { 'width': getattr(getattr(screen, 'size', None), 'width', 0) if hasattr(screen, 'size') else 0, 'height': getattr(getattr(screen, 'size', None), 'height', 0) if hasattr(screen, 'size') else 0 } } # Capture widget states self._capture_widget_states(self.app.screen, state['widgets']) # Capture reactive values self._capture_reactive_values(self.app, state['reactive_values']) return state def _capture_widget_states(self, widget, widget_dict: Dict[str, Any], path: str = ""): """Recursively capture widget states""" widget_id = getattr(widget, 'id', None) or f"{widget.__class__.__name__}_{id(widget)}" full_path = f"{path}/{widget_id}" if path else widget_id widget_state = { 'type': widget.__class__.__name__, 'id': widget_id, 'path': full_path, 'visible': getattr(widget, 'visible', True), 'has_focus': getattr(widget, 'has_focus', False), 'classes': list(getattr(widget, 'classes', [])), 'size': { 'width': getattr(getattr(widget, 'size', None), 'width', 0) if hasattr(widget, 'size') else 0, 'height': getattr(getattr(widget, 'size', None), 'height', 0) if hasattr(widget, 'size') else 0 } } # Add widget-specific data if hasattr(widget, 'label'): widget_state['label'] = str(widget.label) if hasattr(widget, 'text'): widget_state['text'] = str(widget.text)[:200] # Truncate long text if hasattr(widget, 'value'): widget_state['value'] = str(widget.value) widget_dict[full_path] = widget_state # Process children if hasattr(widget, 'children'): for child in widget.children: self._capture_widget_states(child, widget_dict, full_path) def _capture_reactive_values(self, obj, reactive_dict: Dict[str, Any], path: str = ""): """Capture reactive attribute values""" if hasattr(obj, '__dict__'): for attr_name, attr_value in obj.__dict__.items(): if hasattr(attr_value, '__class__') and 'reactive' in str(attr_value.__class__): key = f"{path}.{attr_name}" if path else attr_name reactive_dict[key] = { 'value': str(attr_value), 'type': str(type(attr_value)) } def start_monitoring(self, interval: float = 1.0): """Start monitoring state changes""" self.monitoring = True self.monitor_thread = threading.Thread( target=self._monitor_loop, args=(interval,), daemon=True ) self.monitor_thread.start() def stop_monitoring(self): """Stop monitoring""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join() def _monitor_loop(self, interval: float): """Main monitoring loop""" while self.monitoring: try: state = self.capture_state() self.state_history.append(state) # Keep only last 100 states if len(self.state_history) > 100: self.state_history.pop(0) except Exception as e: print(f"Error capturing state: {e}") time.sleep(interval) def get_state_changes(self, widget_path: Optional[str] = None) -> List[Dict[str, Any]]: """Get state changes for a specific widget or all widgets""" changes = [] for i in range(1, len(self.state_history)): prev_state = self.state_history[i-1] curr_state = self.state_history[i] # Compare states change = { 'timestamp': curr_state['timestamp'], 'changes': {} } # Compare widgets for widget_id, widget_state in curr_state['widgets'].items(): if widget_path and widget_path not in widget_id: continue prev_widget = prev_state['widgets'].get(widget_id, {}) widget_changes = {} for key, value in widget_state.items(): if key not in prev_widget or prev_widget[key] != value: widget_changes[key] = { 'old': prev_widget.get(key), 'new': value } if widget_changes: change['changes'][widget_id] = widget_changes if change['changes']: changes.append(change) return changes def export_state_history(self, filename: str = "textual_state_history.json"): """Export state history to JSON file""" with open(filename, 'w') as f: json.dump(self.state_history, f, indent=2, default=str) print(f"📁 State history exported to {filename}") class TextualStateWebServer: """Web server for visualizing Textual app state""" def __init__(self, monitor: TextualStateMonitor, port: int = 8080): self.monitor = monitor self.port = port self.httpd = None def start(self): """Start the web server""" handler = self._create_handler() self.httpd = socketserver.TCPServer(("", self.port), handler) print(f"🌐 Starting state visualizer at http://localhost:{self.port}") # Start server in background thread server_thread = threading.Thread(target=self.httpd.serve_forever, daemon=True) server_thread.start() # Open browser webbrowser.open(f"http://localhost:{self.port}") def stop(self): """Stop the web server""" if self.httpd: self.httpd.shutdown() def _create_handler(self): """Create HTTP request handler""" monitor = self.monitor class StateHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_html_dashboard() elif self.path == '/api/state': self.send_current_state() elif self.path == '/api/history': self.send_state_history() elif self.path.startswith('/api/changes'): self.send_state_changes() else: self.send_error(404) def send_html_dashboard(self): """Send HTML dashboard""" html = self._generate_dashboard_html() self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.encode()) def send_current_state(self): """Send current app state as JSON""" state = monitor.capture_state() self.send_json_response(state) def send_state_history(self): """Send state history as JSON""" self.send_json_response(monitor.state_history) def send_state_changes(self): """Send state changes as JSON""" query = parse_qs(urlparse(self.path).query) widget_path = query.get('widget', [None])[0] changes = monitor.get_state_changes(widget_path) self.send_json_response(changes) def send_json_response(self, data): """Send JSON response""" self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() json_data = json.dumps(data, indent=2, default=str) self.wfile.write(json_data.encode()) def _generate_dashboard_html(self): """Generate HTML dashboard""" return '''