477 lines
18 KiB
Python
477 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Textual State Visualizer - Real-time state monitoring for Textual apps
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
from typing import Dict, Any, List, Optional
|
|
from pathlib import Path
|
|
import webbrowser
|
|
import http.server
|
|
import socketserver
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
class TextualStateMonitor:
|
|
"""Monitor and capture Textual app state changes"""
|
|
|
|
def __init__(self, app_instance):
|
|
self.app = app_instance
|
|
self.state_history = []
|
|
self.monitoring = False
|
|
self.monitor_thread = None
|
|
|
|
def capture_state(self) -> Dict[str, Any]:
|
|
"""Capture current app state"""
|
|
state = {
|
|
'timestamp': time.time(),
|
|
'focused_widget': None,
|
|
'widgets': {},
|
|
'screen_info': {},
|
|
'reactive_values': {}
|
|
}
|
|
|
|
# Capture focused widget
|
|
if hasattr(self.app, 'focused') and self.app.focused:
|
|
focused = self.app.focused
|
|
state['focused_widget'] = {
|
|
'type': focused.__class__.__name__,
|
|
'id': getattr(focused, 'id', None),
|
|
'classes': list(getattr(focused, 'classes', []))
|
|
}
|
|
|
|
# Capture screen info
|
|
if hasattr(self.app, 'screen'):
|
|
screen = self.app.screen
|
|
state['screen_info'] = {
|
|
'type': screen.__class__.__name__,
|
|
'id': getattr(screen, 'id', None),
|
|
'size': {
|
|
'width': getattr(getattr(screen, 'size', None), 'width', 0) if hasattr(screen, 'size') else 0,
|
|
'height': getattr(getattr(screen, 'size', None), 'height', 0) if hasattr(screen, 'size') else 0
|
|
}
|
|
}
|
|
|
|
# Capture widget states
|
|
self._capture_widget_states(self.app.screen, state['widgets'])
|
|
|
|
# Capture reactive values
|
|
self._capture_reactive_values(self.app, state['reactive_values'])
|
|
|
|
return state
|
|
|
|
def _capture_widget_states(self, widget, widget_dict: Dict[str, Any], path: str = ""):
|
|
"""Recursively capture widget states"""
|
|
widget_id = getattr(widget, 'id', None) or f"{widget.__class__.__name__}_{id(widget)}"
|
|
full_path = f"{path}/{widget_id}" if path else widget_id
|
|
|
|
widget_state = {
|
|
'type': widget.__class__.__name__,
|
|
'id': widget_id,
|
|
'path': full_path,
|
|
'visible': getattr(widget, 'visible', True),
|
|
'has_focus': getattr(widget, 'has_focus', False),
|
|
'classes': list(getattr(widget, 'classes', [])),
|
|
'size': {
|
|
'width': getattr(getattr(widget, 'size', None), 'width', 0) if hasattr(widget, 'size') else 0,
|
|
'height': getattr(getattr(widget, 'size', None), 'height', 0) if hasattr(widget, 'size') else 0
|
|
}
|
|
}
|
|
|
|
# Add widget-specific data
|
|
if hasattr(widget, 'label'):
|
|
widget_state['label'] = str(widget.label)
|
|
if hasattr(widget, 'text'):
|
|
widget_state['text'] = str(widget.text)[:200] # Truncate long text
|
|
if hasattr(widget, 'value'):
|
|
widget_state['value'] = str(widget.value)
|
|
|
|
widget_dict[full_path] = widget_state
|
|
|
|
# Process children
|
|
if hasattr(widget, 'children'):
|
|
for child in widget.children:
|
|
self._capture_widget_states(child, widget_dict, full_path)
|
|
|
|
def _capture_reactive_values(self, obj, reactive_dict: Dict[str, Any], path: str = ""):
|
|
"""Capture reactive attribute values"""
|
|
if hasattr(obj, '__dict__'):
|
|
for attr_name, attr_value in obj.__dict__.items():
|
|
if hasattr(attr_value, '__class__') and 'reactive' in str(attr_value.__class__):
|
|
key = f"{path}.{attr_name}" if path else attr_name
|
|
reactive_dict[key] = {
|
|
'value': str(attr_value),
|
|
'type': str(type(attr_value))
|
|
}
|
|
|
|
def start_monitoring(self, interval: float = 1.0):
|
|
"""Start monitoring state changes"""
|
|
self.monitoring = True
|
|
self.monitor_thread = threading.Thread(
|
|
target=self._monitor_loop,
|
|
args=(interval,),
|
|
daemon=True
|
|
)
|
|
self.monitor_thread.start()
|
|
|
|
def stop_monitoring(self):
|
|
"""Stop monitoring"""
|
|
self.monitoring = False
|
|
if self.monitor_thread:
|
|
self.monitor_thread.join()
|
|
|
|
def _monitor_loop(self, interval: float):
|
|
"""Main monitoring loop"""
|
|
while self.monitoring:
|
|
try:
|
|
state = self.capture_state()
|
|
self.state_history.append(state)
|
|
|
|
# Keep only last 100 states
|
|
if len(self.state_history) > 100:
|
|
self.state_history.pop(0)
|
|
|
|
except Exception as e:
|
|
print(f"Error capturing state: {e}")
|
|
|
|
time.sleep(interval)
|
|
|
|
def get_state_changes(self, widget_path: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Get state changes for a specific widget or all widgets"""
|
|
changes = []
|
|
|
|
for i in range(1, len(self.state_history)):
|
|
prev_state = self.state_history[i-1]
|
|
curr_state = self.state_history[i]
|
|
|
|
# Compare states
|
|
change = {
|
|
'timestamp': curr_state['timestamp'],
|
|
'changes': {}
|
|
}
|
|
|
|
# Compare widgets
|
|
for widget_id, widget_state in curr_state['widgets'].items():
|
|
if widget_path and widget_path not in widget_id:
|
|
continue
|
|
|
|
prev_widget = prev_state['widgets'].get(widget_id, {})
|
|
|
|
widget_changes = {}
|
|
for key, value in widget_state.items():
|
|
if key not in prev_widget or prev_widget[key] != value:
|
|
widget_changes[key] = {
|
|
'old': prev_widget.get(key),
|
|
'new': value
|
|
}
|
|
|
|
if widget_changes:
|
|
change['changes'][widget_id] = widget_changes
|
|
|
|
if change['changes']:
|
|
changes.append(change)
|
|
|
|
return changes
|
|
|
|
def export_state_history(self, filename: str = "textual_state_history.json"):
|
|
"""Export state history to JSON file"""
|
|
with open(filename, 'w') as f:
|
|
json.dump(self.state_history, f, indent=2, default=str)
|
|
print(f"📁 State history exported to {filename}")
|
|
|
|
class TextualStateWebServer:
|
|
"""Web server for visualizing Textual app state"""
|
|
|
|
def __init__(self, monitor: TextualStateMonitor, port: int = 8080):
|
|
self.monitor = monitor
|
|
self.port = port
|
|
self.httpd = None
|
|
|
|
def start(self):
|
|
"""Start the web server"""
|
|
handler = self._create_handler()
|
|
self.httpd = socketserver.TCPServer(("", self.port), handler)
|
|
|
|
print(f"🌐 Starting state visualizer at http://localhost:{self.port}")
|
|
|
|
# Start server in background thread
|
|
server_thread = threading.Thread(target=self.httpd.serve_forever, daemon=True)
|
|
server_thread.start()
|
|
|
|
# Open browser
|
|
webbrowser.open(f"http://localhost:{self.port}")
|
|
|
|
def stop(self):
|
|
"""Stop the web server"""
|
|
if self.httpd:
|
|
self.httpd.shutdown()
|
|
|
|
def _create_handler(self):
|
|
"""Create HTTP request handler"""
|
|
monitor = self.monitor
|
|
|
|
class StateHandler(http.server.SimpleHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == '/':
|
|
self.send_html_dashboard()
|
|
elif self.path == '/api/state':
|
|
self.send_current_state()
|
|
elif self.path == '/api/history':
|
|
self.send_state_history()
|
|
elif self.path.startswith('/api/changes'):
|
|
self.send_state_changes()
|
|
else:
|
|
self.send_error(404)
|
|
|
|
def send_html_dashboard(self):
|
|
"""Send HTML dashboard"""
|
|
html = self._generate_dashboard_html()
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
self.wfile.write(html.encode())
|
|
|
|
def send_current_state(self):
|
|
"""Send current app state as JSON"""
|
|
state = monitor.capture_state()
|
|
self.send_json_response(state)
|
|
|
|
def send_state_history(self):
|
|
"""Send state history as JSON"""
|
|
self.send_json_response(monitor.state_history)
|
|
|
|
def send_state_changes(self):
|
|
"""Send state changes as JSON"""
|
|
query = parse_qs(urlparse(self.path).query)
|
|
widget_path = query.get('widget', [None])[0]
|
|
changes = monitor.get_state_changes(widget_path)
|
|
self.send_json_response(changes)
|
|
|
|
def send_json_response(self, data):
|
|
"""Send JSON response"""
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.end_headers()
|
|
json_data = json.dumps(data, indent=2, default=str)
|
|
self.wfile.write(json_data.encode())
|
|
|
|
def _generate_dashboard_html(self):
|
|
"""Generate HTML dashboard"""
|
|
return '''
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Textual State Visualizer</title>
|
|
<style>
|
|
body { font-family: Arial, sans-serif; margin: 20px; }
|
|
.container { max-width: 1200px; margin: 0 auto; }
|
|
.section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
|
|
.widget-tree { font-family: monospace; }
|
|
.widget-item { margin: 2px 0; padding: 2px; }
|
|
.widget-focused { background-color: #ffffcc; }
|
|
.widget-hidden { opacity: 0.5; }
|
|
.changes { background-color: #f0f8ff; }
|
|
.timestamp { color: #666; font-size: 0.9em; }
|
|
button { margin: 5px; padding: 8px 16px; }
|
|
#auto-refresh { color: green; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="container">
|
|
<h1>🔍 Textual State Visualizer</h1>
|
|
|
|
<div class="section">
|
|
<h2>Controls</h2>
|
|
<button onclick="refreshState()">Refresh State</button>
|
|
<button onclick="toggleAutoRefresh()" id="auto-refresh-btn">Start Auto-refresh</button>
|
|
<button onclick="exportHistory()">Export History</button>
|
|
<span id="auto-refresh" style="display: none;">Auto-refreshing every 2s...</span>
|
|
</div>
|
|
|
|
<div class="section">
|
|
<h2>Current State</h2>
|
|
<div id="current-state">Loading...</div>
|
|
</div>
|
|
|
|
<div class="section">
|
|
<h2>Widget Tree</h2>
|
|
<div id="widget-tree" class="widget-tree">Loading...</div>
|
|
</div>
|
|
|
|
<div class="section">
|
|
<h2>Recent Changes</h2>
|
|
<div id="recent-changes">Loading...</div>
|
|
</div>
|
|
</div>
|
|
|
|
<script>
|
|
let autoRefreshInterval = null;
|
|
|
|
function refreshState() {
|
|
fetch('/api/state')
|
|
.then(response => response.json())
|
|
.then(data => {
|
|
updateCurrentState(data);
|
|
updateWidgetTree(data);
|
|
});
|
|
|
|
fetch('/api/changes')
|
|
.then(response => response.json())
|
|
.then(data => updateRecentChanges(data));
|
|
}
|
|
|
|
function updateCurrentState(state) {
|
|
const div = document.getElementById('current-state');
|
|
div.innerHTML = `
|
|
<p><strong>Timestamp:</strong> ${new Date(state.timestamp * 1000).toLocaleString()}</p>
|
|
<p><strong>Focused Widget:</strong> ${state.focused_widget ?
|
|
`${state.focused_widget.type}#${state.focused_widget.id}` : 'None'}</p>
|
|
<p><strong>Screen:</strong> ${state.screen_info.type} (${state.screen_info.size.width}x${state.screen_info.size.height})</p>
|
|
<p><strong>Total Widgets:</strong> ${Object.keys(state.widgets).length}</p>
|
|
`;
|
|
}
|
|
|
|
function updateWidgetTree(state) {
|
|
const div = document.getElementById('widget-tree');
|
|
let html = '';
|
|
|
|
for (const [path, widget] of Object.entries(state.widgets)) {
|
|
const indent = ' '.repeat((path.match(/\\//g) || []).length);
|
|
const classes = widget.has_focus ? 'widget-focused' : (widget.visible ? '' : 'widget-hidden');
|
|
|
|
html += `<div class="widget-item ${classes}">`;
|
|
html += `${indent}📦 ${widget.type}`;
|
|
if (widget.id) html += ` #${widget.id}`;
|
|
if (widget.label) html += ` [${widget.label}]`;
|
|
html += ` (${widget.size.width}x${widget.size.height})`;
|
|
if (widget.has_focus) html += ' 🎯';
|
|
if (!widget.visible) html += ' 👻';
|
|
html += '</div>';
|
|
}
|
|
|
|
div.innerHTML = html;
|
|
}
|
|
|
|
function updateRecentChanges(changes) {
|
|
const div = document.getElementById('recent-changes');
|
|
let html = '';
|
|
|
|
changes.slice(-10).forEach(change => {
|
|
html += `<div class="changes">`;
|
|
html += `<div class="timestamp">${new Date(change.timestamp * 1000).toLocaleString()}</div>`;
|
|
|
|
for (const [widget, widgetChanges] of Object.entries(change.changes)) {
|
|
html += `<strong>${widget}:</strong><br/>`;
|
|
for (const [prop, change] of Object.entries(widgetChanges)) {
|
|
html += ` ${prop}: ${change.old} → ${change.new}<br/>`;
|
|
}
|
|
}
|
|
html += '</div>';
|
|
});
|
|
|
|
div.innerHTML = html || 'No recent changes';
|
|
}
|
|
|
|
function toggleAutoRefresh() {
|
|
const btn = document.getElementById('auto-refresh-btn');
|
|
const indicator = document.getElementById('auto-refresh');
|
|
|
|
if (autoRefreshInterval) {
|
|
clearInterval(autoRefreshInterval);
|
|
autoRefreshInterval = null;
|
|
btn.textContent = 'Start Auto-refresh';
|
|
indicator.style.display = 'none';
|
|
} else {
|
|
autoRefreshInterval = setInterval(refreshState, 2000);
|
|
btn.textContent = 'Stop Auto-refresh';
|
|
indicator.style.display = 'inline';
|
|
}
|
|
}
|
|
|
|
function exportHistory() {
|
|
window.open('/api/history', '_blank');
|
|
}
|
|
|
|
// Initial load
|
|
refreshState();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
'''
|
|
|
|
return StateHandler
|
|
|
|
def create_textual_debug_setup() -> str:
|
|
"""Create a setup snippet for easy debugging"""
|
|
return '''
|
|
# Add this to your Textual app for easy state monitoring
|
|
|
|
from textual_state_visualizer import TextualStateMonitor, TextualStateWebServer
|
|
|
|
class DebugMixin:
|
|
"""Mixin to add debugging capabilities to Textual apps"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._debug_monitor = None
|
|
self._debug_server = None
|
|
|
|
def start_debug_monitoring(self, web_interface: bool = True, port: int = 8080):
|
|
"""Start debug monitoring with optional web interface"""
|
|
self._debug_monitor = TextualStateMonitor(self)
|
|
self._debug_monitor.start_monitoring()
|
|
|
|
if web_interface:
|
|
self._debug_server = TextualStateWebServer(self._debug_monitor, port)
|
|
self._debug_server.start()
|
|
|
|
print(f"🔍 Debug monitoring started!")
|
|
if web_interface:
|
|
print(f"🌐 Web interface: http://localhost:{port}")
|
|
|
|
def stop_debug_monitoring(self):
|
|
"""Stop debug monitoring"""
|
|
if self._debug_monitor:
|
|
self._debug_monitor.stop_monitoring()
|
|
if self._debug_server:
|
|
self._debug_server.stop()
|
|
|
|
def debug_current_state(self):
|
|
"""Print current state to console"""
|
|
if self._debug_monitor:
|
|
state = self._debug_monitor.capture_state()
|
|
print("🔍 Current Textual App State:")
|
|
print(f" Focused: {state.get('focused_widget', 'None')}")
|
|
print(f" Widgets: {len(state.get('widgets', {}))}")
|
|
for path, widget in state.get('widgets', {}).items():
|
|
status = []
|
|
if widget.get('has_focus'): status.append('FOCUSED')
|
|
if not widget.get('visible'): status.append('HIDDEN')
|
|
status_str = f" [{', '.join(status)}]" if status else ""
|
|
print(f" {path}: {widget['type']}{status_str}")
|
|
|
|
# Usage in your app:
|
|
# class MyApp(App, DebugMixin):
|
|
# def on_mount(self):
|
|
# self.start_debug_monitoring()
|
|
'''
|
|
|
|
def main():
|
|
print("🔍 Textual State Visualizer")
|
|
print("=" * 50)
|
|
print("This tool provides real-time monitoring of Textual app state.")
|
|
print("\nFeatures:")
|
|
print(" 📊 Real-time widget tree visualization")
|
|
print(" 🔄 State change tracking")
|
|
print(" 🌐 Web-based dashboard")
|
|
print(" 📁 State history export")
|
|
print(" 🎯 Focus tracking")
|
|
|
|
print("\n" + "=" * 50)
|
|
print("📝 Setup code for your app:")
|
|
print(create_textual_debug_setup())
|
|
|
|
if __name__ == "__main__":
|
|
main() |