Files
StreamLens/textual_state_visualizer.py

477 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Textual State Visualizer - Real-time state monitoring for Textual apps
"""
import json
import time
import threading
from typing import Dict, Any, List, Optional
from pathlib import Path
import webbrowser
import http.server
import socketserver
from urllib.parse import urlparse, parse_qs
class TextualStateMonitor:
"""Monitor and capture Textual app state changes"""
def __init__(self, app_instance):
self.app = app_instance
self.state_history = []
self.monitoring = False
self.monitor_thread = None
def capture_state(self) -> Dict[str, Any]:
"""Capture current app state"""
state = {
'timestamp': time.time(),
'focused_widget': None,
'widgets': {},
'screen_info': {},
'reactive_values': {}
}
# Capture focused widget
if hasattr(self.app, 'focused') and self.app.focused:
focused = self.app.focused
state['focused_widget'] = {
'type': focused.__class__.__name__,
'id': getattr(focused, 'id', None),
'classes': list(getattr(focused, 'classes', []))
}
# Capture screen info
if hasattr(self.app, 'screen'):
screen = self.app.screen
state['screen_info'] = {
'type': screen.__class__.__name__,
'id': getattr(screen, 'id', None),
'size': {
'width': getattr(getattr(screen, 'size', None), 'width', 0) if hasattr(screen, 'size') else 0,
'height': getattr(getattr(screen, 'size', None), 'height', 0) if hasattr(screen, 'size') else 0
}
}
# Capture widget states
self._capture_widget_states(self.app.screen, state['widgets'])
# Capture reactive values
self._capture_reactive_values(self.app, state['reactive_values'])
return state
def _capture_widget_states(self, widget, widget_dict: Dict[str, Any], path: str = ""):
"""Recursively capture widget states"""
widget_id = getattr(widget, 'id', None) or f"{widget.__class__.__name__}_{id(widget)}"
full_path = f"{path}/{widget_id}" if path else widget_id
widget_state = {
'type': widget.__class__.__name__,
'id': widget_id,
'path': full_path,
'visible': getattr(widget, 'visible', True),
'has_focus': getattr(widget, 'has_focus', False),
'classes': list(getattr(widget, 'classes', [])),
'size': {
'width': getattr(getattr(widget, 'size', None), 'width', 0) if hasattr(widget, 'size') else 0,
'height': getattr(getattr(widget, 'size', None), 'height', 0) if hasattr(widget, 'size') else 0
}
}
# Add widget-specific data
if hasattr(widget, 'label'):
widget_state['label'] = str(widget.label)
if hasattr(widget, 'text'):
widget_state['text'] = str(widget.text)[:200] # Truncate long text
if hasattr(widget, 'value'):
widget_state['value'] = str(widget.value)
widget_dict[full_path] = widget_state
# Process children
if hasattr(widget, 'children'):
for child in widget.children:
self._capture_widget_states(child, widget_dict, full_path)
def _capture_reactive_values(self, obj, reactive_dict: Dict[str, Any], path: str = ""):
"""Capture reactive attribute values"""
if hasattr(obj, '__dict__'):
for attr_name, attr_value in obj.__dict__.items():
if hasattr(attr_value, '__class__') and 'reactive' in str(attr_value.__class__):
key = f"{path}.{attr_name}" if path else attr_name
reactive_dict[key] = {
'value': str(attr_value),
'type': str(type(attr_value))
}
def start_monitoring(self, interval: float = 1.0):
"""Start monitoring state changes"""
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self, interval: float):
"""Main monitoring loop"""
while self.monitoring:
try:
state = self.capture_state()
self.state_history.append(state)
# Keep only last 100 states
if len(self.state_history) > 100:
self.state_history.pop(0)
except Exception as e:
print(f"Error capturing state: {e}")
time.sleep(interval)
def get_state_changes(self, widget_path: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get state changes for a specific widget or all widgets"""
changes = []
for i in range(1, len(self.state_history)):
prev_state = self.state_history[i-1]
curr_state = self.state_history[i]
# Compare states
change = {
'timestamp': curr_state['timestamp'],
'changes': {}
}
# Compare widgets
for widget_id, widget_state in curr_state['widgets'].items():
if widget_path and widget_path not in widget_id:
continue
prev_widget = prev_state['widgets'].get(widget_id, {})
widget_changes = {}
for key, value in widget_state.items():
if key not in prev_widget or prev_widget[key] != value:
widget_changes[key] = {
'old': prev_widget.get(key),
'new': value
}
if widget_changes:
change['changes'][widget_id] = widget_changes
if change['changes']:
changes.append(change)
return changes
def export_state_history(self, filename: str = "textual_state_history.json"):
"""Export state history to JSON file"""
with open(filename, 'w') as f:
json.dump(self.state_history, f, indent=2, default=str)
print(f"📁 State history exported to {filename}")
class TextualStateWebServer:
"""Web server for visualizing Textual app state"""
def __init__(self, monitor: TextualStateMonitor, port: int = 8080):
self.monitor = monitor
self.port = port
self.httpd = None
def start(self):
"""Start the web server"""
handler = self._create_handler()
self.httpd = socketserver.TCPServer(("", self.port), handler)
print(f"🌐 Starting state visualizer at http://localhost:{self.port}")
# Start server in background thread
server_thread = threading.Thread(target=self.httpd.serve_forever, daemon=True)
server_thread.start()
# Open browser
webbrowser.open(f"http://localhost:{self.port}")
def stop(self):
"""Stop the web server"""
if self.httpd:
self.httpd.shutdown()
def _create_handler(self):
"""Create HTTP request handler"""
monitor = self.monitor
class StateHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_html_dashboard()
elif self.path == '/api/state':
self.send_current_state()
elif self.path == '/api/history':
self.send_state_history()
elif self.path.startswith('/api/changes'):
self.send_state_changes()
else:
self.send_error(404)
def send_html_dashboard(self):
"""Send HTML dashboard"""
html = self._generate_dashboard_html()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode())
def send_current_state(self):
"""Send current app state as JSON"""
state = monitor.capture_state()
self.send_json_response(state)
def send_state_history(self):
"""Send state history as JSON"""
self.send_json_response(monitor.state_history)
def send_state_changes(self):
"""Send state changes as JSON"""
query = parse_qs(urlparse(self.path).query)
widget_path = query.get('widget', [None])[0]
changes = monitor.get_state_changes(widget_path)
self.send_json_response(changes)
def send_json_response(self, data):
"""Send JSON response"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
json_data = json.dumps(data, indent=2, default=str)
self.wfile.write(json_data.encode())
def _generate_dashboard_html(self):
"""Generate HTML dashboard"""
return '''
<!DOCTYPE html>
<html>
<head>
<title>Textual State Visualizer</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 1200px; margin: 0 auto; }
.section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
.widget-tree { font-family: monospace; }
.widget-item { margin: 2px 0; padding: 2px; }
.widget-focused { background-color: #ffffcc; }
.widget-hidden { opacity: 0.5; }
.changes { background-color: #f0f8ff; }
.timestamp { color: #666; font-size: 0.9em; }
button { margin: 5px; padding: 8px 16px; }
#auto-refresh { color: green; }
</style>
</head>
<body>
<div class="container">
<h1>🔍 Textual State Visualizer</h1>
<div class="section">
<h2>Controls</h2>
<button onclick="refreshState()">Refresh State</button>
<button onclick="toggleAutoRefresh()" id="auto-refresh-btn">Start Auto-refresh</button>
<button onclick="exportHistory()">Export History</button>
<span id="auto-refresh" style="display: none;">Auto-refreshing every 2s...</span>
</div>
<div class="section">
<h2>Current State</h2>
<div id="current-state">Loading...</div>
</div>
<div class="section">
<h2>Widget Tree</h2>
<div id="widget-tree" class="widget-tree">Loading...</div>
</div>
<div class="section">
<h2>Recent Changes</h2>
<div id="recent-changes">Loading...</div>
</div>
</div>
<script>
let autoRefreshInterval = null;
function refreshState() {
fetch('/api/state')
.then(response => response.json())
.then(data => {
updateCurrentState(data);
updateWidgetTree(data);
});
fetch('/api/changes')
.then(response => response.json())
.then(data => updateRecentChanges(data));
}
function updateCurrentState(state) {
const div = document.getElementById('current-state');
div.innerHTML = `
<p><strong>Timestamp:</strong> ${new Date(state.timestamp * 1000).toLocaleString()}</p>
<p><strong>Focused Widget:</strong> ${state.focused_widget ?
`${state.focused_widget.type}#${state.focused_widget.id}` : 'None'}</p>
<p><strong>Screen:</strong> ${state.screen_info.type} (${state.screen_info.size.width}x${state.screen_info.size.height})</p>
<p><strong>Total Widgets:</strong> ${Object.keys(state.widgets).length}</p>
`;
}
function updateWidgetTree(state) {
const div = document.getElementById('widget-tree');
let html = '';
for (const [path, widget] of Object.entries(state.widgets)) {
const indent = ' '.repeat((path.match(/\\//g) || []).length);
const classes = widget.has_focus ? 'widget-focused' : (widget.visible ? '' : 'widget-hidden');
html += `<div class="widget-item ${classes}">`;
html += `${indent}📦 ${widget.type}`;
if (widget.id) html += ` #${widget.id}`;
if (widget.label) html += ` [${widget.label}]`;
html += ` (${widget.size.width}x${widget.size.height})`;
if (widget.has_focus) html += ' 🎯';
if (!widget.visible) html += ' 👻';
html += '</div>';
}
div.innerHTML = html;
}
function updateRecentChanges(changes) {
const div = document.getElementById('recent-changes');
let html = '';
changes.slice(-10).forEach(change => {
html += `<div class="changes">`;
html += `<div class="timestamp">${new Date(change.timestamp * 1000).toLocaleString()}</div>`;
for (const [widget, widgetChanges] of Object.entries(change.changes)) {
html += `<strong>${widget}:</strong><br/>`;
for (const [prop, change] of Object.entries(widgetChanges)) {
html += ` ${prop}: ${change.old} → ${change.new}<br/>`;
}
}
html += '</div>';
});
div.innerHTML = html || 'No recent changes';
}
function toggleAutoRefresh() {
const btn = document.getElementById('auto-refresh-btn');
const indicator = document.getElementById('auto-refresh');
if (autoRefreshInterval) {
clearInterval(autoRefreshInterval);
autoRefreshInterval = null;
btn.textContent = 'Start Auto-refresh';
indicator.style.display = 'none';
} else {
autoRefreshInterval = setInterval(refreshState, 2000);
btn.textContent = 'Stop Auto-refresh';
indicator.style.display = 'inline';
}
}
function exportHistory() {
window.open('/api/history', '_blank');
}
// Initial load
refreshState();
</script>
</body>
</html>
'''
return StateHandler
def create_textual_debug_setup() -> str:
"""Create a setup snippet for easy debugging"""
return '''
# Add this to your Textual app for easy state monitoring
from textual_state_visualizer import TextualStateMonitor, TextualStateWebServer
class DebugMixin:
"""Mixin to add debugging capabilities to Textual apps"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._debug_monitor = None
self._debug_server = None
def start_debug_monitoring(self, web_interface: bool = True, port: int = 8080):
"""Start debug monitoring with optional web interface"""
self._debug_monitor = TextualStateMonitor(self)
self._debug_monitor.start_monitoring()
if web_interface:
self._debug_server = TextualStateWebServer(self._debug_monitor, port)
self._debug_server.start()
print(f"🔍 Debug monitoring started!")
if web_interface:
print(f"🌐 Web interface: http://localhost:{port}")
def stop_debug_monitoring(self):
"""Stop debug monitoring"""
if self._debug_monitor:
self._debug_monitor.stop_monitoring()
if self._debug_server:
self._debug_server.stop()
def debug_current_state(self):
"""Print current state to console"""
if self._debug_monitor:
state = self._debug_monitor.capture_state()
print("🔍 Current Textual App State:")
print(f" Focused: {state.get('focused_widget', 'None')}")
print(f" Widgets: {len(state.get('widgets', {}))}")
for path, widget in state.get('widgets', {}).items():
status = []
if widget.get('has_focus'): status.append('FOCUSED')
if not widget.get('visible'): status.append('HIDDEN')
status_str = f" [{', '.join(status)}]" if status else ""
print(f" {path}: {widget['type']}{status_str}")
# Usage in your app:
# class MyApp(App, DebugMixin):
# def on_mount(self):
# self.start_debug_monitoring()
'''
def main():
print("🔍 Textual State Visualizer")
print("=" * 50)
print("This tool provides real-time monitoring of Textual app state.")
print("\nFeatures:")
print(" 📊 Real-time widget tree visualization")
print(" 🔄 State change tracking")
print(" 🌐 Web-based dashboard")
print(" 📁 State history export")
print(" 🎯 Focus tracking")
print("\n" + "=" * 50)
print("📝 Setup code for your app:")
print(create_textual_debug_setup())
if __name__ == "__main__":
main()