#!/usr/bin/env python3 """ Textual State Visualizer - Real-time state monitoring for Textual apps """ import json import time import threading from typing import Dict, Any, List, Optional from pathlib import Path import webbrowser import http.server import socketserver from urllib.parse import urlparse, parse_qs class TextualStateMonitor: """Monitor and capture Textual app state changes""" def __init__(self, app_instance): self.app = app_instance self.state_history = [] self.monitoring = False self.monitor_thread = None def capture_state(self) -> Dict[str, Any]: """Capture current app state""" state = { 'timestamp': time.time(), 'focused_widget': None, 'widgets': {}, 'screen_info': {}, 'reactive_values': {} } # Capture focused widget if hasattr(self.app, 'focused') and self.app.focused: focused = self.app.focused state['focused_widget'] = { 'type': focused.__class__.__name__, 'id': getattr(focused, 'id', None), 'classes': list(getattr(focused, 'classes', [])) } # Capture screen info if hasattr(self.app, 'screen'): screen = self.app.screen state['screen_info'] = { 'type': screen.__class__.__name__, 'id': getattr(screen, 'id', None), 'size': { 'width': getattr(getattr(screen, 'size', None), 'width', 0) if hasattr(screen, 'size') else 0, 'height': getattr(getattr(screen, 'size', None), 'height', 0) if hasattr(screen, 'size') else 0 } } # Capture widget states self._capture_widget_states(self.app.screen, state['widgets']) # Capture reactive values self._capture_reactive_values(self.app, state['reactive_values']) return state def _capture_widget_states(self, widget, widget_dict: Dict[str, Any], path: str = ""): """Recursively capture widget states""" widget_id = getattr(widget, 'id', None) or f"{widget.__class__.__name__}_{id(widget)}" full_path = f"{path}/{widget_id}" if path else widget_id widget_state = { 'type': widget.__class__.__name__, 'id': widget_id, 'path': full_path, 'visible': getattr(widget, 'visible', True), 'has_focus': getattr(widget, 'has_focus', False), 'classes': list(getattr(widget, 'classes', [])), 'size': { 'width': getattr(getattr(widget, 'size', None), 'width', 0) if hasattr(widget, 'size') else 0, 'height': getattr(getattr(widget, 'size', None), 'height', 0) if hasattr(widget, 'size') else 0 } } # Add widget-specific data if hasattr(widget, 'label'): widget_state['label'] = str(widget.label) if hasattr(widget, 'text'): widget_state['text'] = str(widget.text)[:200] # Truncate long text if hasattr(widget, 'value'): widget_state['value'] = str(widget.value) widget_dict[full_path] = widget_state # Process children if hasattr(widget, 'children'): for child in widget.children: self._capture_widget_states(child, widget_dict, full_path) def _capture_reactive_values(self, obj, reactive_dict: Dict[str, Any], path: str = ""): """Capture reactive attribute values""" if hasattr(obj, '__dict__'): for attr_name, attr_value in obj.__dict__.items(): if hasattr(attr_value, '__class__') and 'reactive' in str(attr_value.__class__): key = f"{path}.{attr_name}" if path else attr_name reactive_dict[key] = { 'value': str(attr_value), 'type': str(type(attr_value)) } def start_monitoring(self, interval: float = 1.0): """Start monitoring state changes""" self.monitoring = True self.monitor_thread = threading.Thread( target=self._monitor_loop, args=(interval,), daemon=True ) self.monitor_thread.start() def stop_monitoring(self): """Stop monitoring""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join() def _monitor_loop(self, interval: float): """Main monitoring loop""" while self.monitoring: try: state = self.capture_state() self.state_history.append(state) # Keep only last 100 states if len(self.state_history) > 100: self.state_history.pop(0) except Exception as e: print(f"Error capturing state: {e}") time.sleep(interval) def get_state_changes(self, widget_path: Optional[str] = None) -> List[Dict[str, Any]]: """Get state changes for a specific widget or all widgets""" changes = [] for i in range(1, len(self.state_history)): prev_state = self.state_history[i-1] curr_state = self.state_history[i] # Compare states change = { 'timestamp': curr_state['timestamp'], 'changes': {} } # Compare widgets for widget_id, widget_state in curr_state['widgets'].items(): if widget_path and widget_path not in widget_id: continue prev_widget = prev_state['widgets'].get(widget_id, {}) widget_changes = {} for key, value in widget_state.items(): if key not in prev_widget or prev_widget[key] != value: widget_changes[key] = { 'old': prev_widget.get(key), 'new': value } if widget_changes: change['changes'][widget_id] = widget_changes if change['changes']: changes.append(change) return changes def export_state_history(self, filename: str = "textual_state_history.json"): """Export state history to JSON file""" with open(filename, 'w') as f: json.dump(self.state_history, f, indent=2, default=str) print(f"📁 State history exported to {filename}") class TextualStateWebServer: """Web server for visualizing Textual app state""" def __init__(self, monitor: TextualStateMonitor, port: int = 8080): self.monitor = monitor self.port = port self.httpd = None def start(self): """Start the web server""" handler = self._create_handler() self.httpd = socketserver.TCPServer(("", self.port), handler) print(f"🌐 Starting state visualizer at http://localhost:{self.port}") # Start server in background thread server_thread = threading.Thread(target=self.httpd.serve_forever, daemon=True) server_thread.start() # Open browser webbrowser.open(f"http://localhost:{self.port}") def stop(self): """Stop the web server""" if self.httpd: self.httpd.shutdown() def _create_handler(self): """Create HTTP request handler""" monitor = self.monitor class StateHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_html_dashboard() elif self.path == '/api/state': self.send_current_state() elif self.path == '/api/history': self.send_state_history() elif self.path.startswith('/api/changes'): self.send_state_changes() else: self.send_error(404) def send_html_dashboard(self): """Send HTML dashboard""" html = self._generate_dashboard_html() self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.encode()) def send_current_state(self): """Send current app state as JSON""" state = monitor.capture_state() self.send_json_response(state) def send_state_history(self): """Send state history as JSON""" self.send_json_response(monitor.state_history) def send_state_changes(self): """Send state changes as JSON""" query = parse_qs(urlparse(self.path).query) widget_path = query.get('widget', [None])[0] changes = monitor.get_state_changes(widget_path) self.send_json_response(changes) def send_json_response(self, data): """Send JSON response""" self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() json_data = json.dumps(data, indent=2, default=str) self.wfile.write(json_data.encode()) def _generate_dashboard_html(self): """Generate HTML dashboard""" return ''' Textual State Visualizer

🔍 Textual State Visualizer

Controls

Current State

Loading...

Widget Tree

Loading...

Recent Changes

Loading...
''' return StateHandler def create_textual_debug_setup() -> str: """Create a setup snippet for easy debugging""" return ''' # Add this to your Textual app for easy state monitoring from textual_state_visualizer import TextualStateMonitor, TextualStateWebServer class DebugMixin: """Mixin to add debugging capabilities to Textual apps""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._debug_monitor = None self._debug_server = None def start_debug_monitoring(self, web_interface: bool = True, port: int = 8080): """Start debug monitoring with optional web interface""" self._debug_monitor = TextualStateMonitor(self) self._debug_monitor.start_monitoring() if web_interface: self._debug_server = TextualStateWebServer(self._debug_monitor, port) self._debug_server.start() print(f"🔍 Debug monitoring started!") if web_interface: print(f"🌐 Web interface: http://localhost:{port}") def stop_debug_monitoring(self): """Stop debug monitoring""" if self._debug_monitor: self._debug_monitor.stop_monitoring() if self._debug_server: self._debug_server.stop() def debug_current_state(self): """Print current state to console""" if self._debug_monitor: state = self._debug_monitor.capture_state() print("🔍 Current Textual App State:") print(f" Focused: {state.get('focused_widget', 'None')}") print(f" Widgets: {len(state.get('widgets', {}))}") for path, widget in state.get('widgets', {}).items(): status = [] if widget.get('has_focus'): status.append('FOCUSED') if not widget.get('visible'): status.append('HIDDEN') status_str = f" [{', '.join(status)}]" if status else "" print(f" {path}: {widget['type']}{status_str}") # Usage in your app: # class MyApp(App, DebugMixin): # def on_mount(self): # self.start_debug_monitoring() ''' def main(): print("🔍 Textual State Visualizer") print("=" * 50) print("This tool provides real-time monitoring of Textual app state.") print("\nFeatures:") print(" 📊 Real-time widget tree visualization") print(" 🔄 State change tracking") print(" 🌐 Web-based dashboard") print(" 📁 State history export") print(" 🎯 Focus tracking") print("\n" + "=" * 50) print("📝 Setup code for your app:") print(create_textual_debug_setup()) if __name__ == "__main__": main()