bookmark - UI Priority 1 Implemented

This commit is contained in:
2026-04-10 15:34:06 -04:00
parent 4253aa01b8
commit 7efea55ddc
27 changed files with 3293 additions and 585 deletions

View File

@@ -1 +1,18 @@
"""Dash web application."""
"""Dash web application.
The web module provides an interactive browser-based UI for Impakt, built
on Plotly Dash with Bootstrap components.
Structure:
app.py - Application factory
state.py - Server-side state management (AppState)
layout.py - Top-level layout assembly
callbacks.py - Callback registration hub
components/ - Reusable layout components
callbacks/ - Feature-specific callback modules
assets/ - CSS and static files
"""
from impakt.web.app import create_app, serve
__all__ = ["create_app", "serve"]

View File

@@ -1,7 +1,12 @@
"""Dash web application factory."""
"""Dash web application factory.
Creates the Dash app with all layout components and callbacks registered.
The AppState holds server-side data; Dash stores hold lightweight UI state.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import dash
@@ -11,30 +16,43 @@ from impakt.channel.model import TestData
from impakt.template.library import TemplateLibrary
from impakt.web.callbacks import register_callbacks
from impakt.web.layout import build_layout
from impakt.web.state import AppState
if TYPE_CHECKING:
from impakt.script.api import Session
def create_app(
session_or_data: Session | TestData,
session_or_data: Session | TestData | None = None,
template_names: list[str] | None = None,
app_state: AppState | None = None,
) -> dash.Dash:
"""Create the Dash application.
Args:
session_or_data: A Session or TestData object.
session_or_data: A Session, TestData, or None (empty app).
template_names: Available template names for the selector.
app_state: Pre-configured AppState. If None, one is created.
Returns:
Configured Dash app ready to run.
"""
# Resolve test data
# Build or use provided AppState
if app_state is None:
app_state = AppState()
if session_or_data is not None:
if hasattr(session_or_data, "data"):
test_data = session_or_data.data # type: ignore[union-attr]
test_data: TestData = session_or_data.data # type: ignore[union-attr]
else:
test_data = session_or_data # type: ignore[assignment]
# Create a LoadedTest from TestData directly
from impakt.web.state import LoadedTest
loaded = LoadedTest(test_data)
app_state._tests[test_data.test_id] = loaded
app_state._test_order.append(test_data.test_id)
# Discover templates
if template_names is None:
try:
@@ -43,21 +61,26 @@ def create_app(
except Exception:
template_names = []
# Title
title = "Impakt"
if app_state.primary_test:
title = f"Impakt — {app_state.primary_test.test_id}"
app = dash.Dash(
__name__,
external_stylesheets=[dbc.themes.FLATLY],
title=f"Impakt — {test_data.test_id}",
title=title,
suppress_callback_exceptions=True,
)
app.layout = build_layout(test_data, template_names)
register_callbacks(app, test_data)
app.layout = build_layout(app_state, template_names)
register_callbacks(app, app_state)
return app
def serve(
session_or_data: Session | TestData,
session_or_data: Session | TestData | None = None,
template: str | None = None,
port: int = 8050,
debug: bool = False,
@@ -65,12 +88,12 @@ def serve(
"""Convenience function to create and run the web UI.
Args:
session_or_data: Session or TestData to visualize.
session_or_data: Session or TestData to visualize, or None for empty app.
template: Template name to pre-apply.
port: Server port.
debug: Enable Dash debug mode.
"""
if template and hasattr(session_or_data, "apply_template"):
if template and session_or_data and hasattr(session_or_data, "apply_template"):
session_or_data.apply_template(template) # type: ignore[union-attr]
app = create_app(session_or_data)

View File

@@ -0,0 +1,84 @@
/**
* Draggable splitter handle for resizing the left panel.
*
* Attaches mousedown/mousemove/mouseup listeners to the splitter handle
* element. On drag, adjusts the width of the left panel. No Dash callback
* needed — this runs entirely in the browser.
*/
(function () {
"use strict";
function initSplitter() {
var handle = document.getElementById("splitter-handle");
var leftPanel = document.getElementById("left-panel");
if (!handle || !leftPanel) {
// Elements not yet in DOM — retry shortly
setTimeout(initSplitter, 200);
return;
}
var isDragging = false;
var startX = 0;
var startWidth = 0;
handle.addEventListener("mousedown", function (e) {
isDragging = true;
startX = e.clientX;
startWidth = leftPanel.offsetWidth;
// Prevent text selection during drag
document.body.style.userSelect = "none";
document.body.style.cursor = "col-resize";
handle.style.backgroundColor = "#adb5bd";
e.preventDefault();
});
document.addEventListener("mousemove", function (e) {
if (!isDragging) return;
var delta = e.clientX - startX;
var newWidth = startWidth + delta;
// Respect min/max from the CSS
var minWidth = parseInt(leftPanel.style.minWidth) || 200;
var maxWidth = parseInt(leftPanel.style.maxWidth) || 600;
newWidth = Math.max(minWidth, Math.min(maxWidth, newWidth));
leftPanel.style.width = newWidth + "px";
});
document.addEventListener("mouseup", function () {
if (!isDragging) return;
isDragging = false;
document.body.style.userSelect = "";
document.body.style.cursor = "";
handle.style.backgroundColor = "#e9ecef";
// Trigger a window resize event so Plotly graphs reflow
window.dispatchEvent(new Event("resize"));
});
// Hover effect
handle.addEventListener("mouseenter", function () {
if (!isDragging) {
handle.style.backgroundColor = "#ced4da";
}
});
handle.addEventListener("mouseleave", function () {
if (!isDragging) {
handle.style.backgroundColor = "#e9ecef";
}
});
}
// Wait for DOM ready
if (document.readyState === "loading") {
document.addEventListener("DOMContentLoaded", initSplitter);
} else {
initSplitter();
}
})();

View File

@@ -0,0 +1,134 @@
/* Impakt custom styles */
/* Tighter accordion spacing */
.accordion-button {
padding: 6px 12px;
font-size: 12px;
}
.accordion-body {
padding: 4px 12px;
}
/* Channel items */
.channel-item {
padding: 1px 0;
}
.channel-item .form-check {
margin-bottom: 0;
min-height: 0;
}
.channel-item .form-check-label {
font-size: 12px;
line-height: 1.3;
}
.channel-item .form-check-input {
margin-top: 3px;
}
/* Card headers */
.card-header {
padding: 6px 12px;
font-size: 13px;
}
/* Compact card body */
.card-body {
padding: 8px 12px;
}
/* Selected channel badges */
.badge {
font-weight: 500;
}
/* Table styling */
.table-sm th,
.table-sm td {
padding: 3px 6px;
}
/* Resizable DataTable columns */
.dash-spreadsheet .dash-header th {
resize: horizontal;
overflow: hidden;
}
/* Don't make the tiny # column or the checkbox column resizable */
.dash-spreadsheet .dash-header th:first-child {
resize: none;
}
/* Scrollbar styling */
::-webkit-scrollbar {
width: 6px;
}
::-webkit-scrollbar-track {
background: #f1f1f1;
border-radius: 3px;
}
::-webkit-scrollbar-thumb {
background: #ccc;
border-radius: 3px;
}
::-webkit-scrollbar-thumb:hover {
background: #aaa;
}
/* Layout button active state */
.btn-group .btn.active {
font-weight: bold;
}
/* Criteria row hover */
tr[id*="criteria-row"]:hover {
background-color: #f0f7ff;
cursor: pointer;
}
/* Splitter handle */
#splitter-handle {
display: flex;
align-items: center;
justify-content: center;
}
#splitter-handle::after {
content: "";
display: block;
width: 2px;
height: 32px;
background-color: #ced4da;
border-radius: 1px;
}
#splitter-handle:hover::after {
background-color: #6c757d;
}
/* Prevent layout shift during splitter drag */
#main-content {
user-select: none;
}
/* Navbar tight */
.navbar {
padding: 4px 0;
}
/* Remove input spinners */
input[type=number]::-webkit-inner-spin-button,
input[type=number]::-webkit-outer-spin-button {
-webkit-appearance: none;
margin: 0;
}
input[type=number] {
-moz-appearance: textfield;
}

View File

@@ -1,256 +0,0 @@
"""Dash callbacks for the Impakt web UI.
Connects UI components to the Impakt engine: channel selection triggers
plot updates, transform controls modify the display, and cursors update
the values table.
"""
from __future__ import annotations
from typing import Any
import dash
import numpy as np
import plotly.graph_objects as go
from dash import Input, Output, State, callback_context, html
from dash.exceptions import PreventUpdate
from impakt.channel.model import Channel, TestData
from impakt.plot.engine import DEFAULT_COLORS
from impakt.transform.align import YAlign
from impakt.transform.cfc import CFCFilter
def register_callbacks(app: dash.Dash, test_data: TestData) -> None:
"""Register all callbacks with the Dash app."""
@app.callback(
Output("main-plot", "figure"),
[
Input("channel-select", "value"),
Input("cfc-select", "value"),
Input("show-resultant", "value"),
Input("y-align-check", "value"),
Input("cursor-update-btn", "n_clicks"),
],
[
State("cursor-x1", "value"),
State("cursor-x2", "value"),
],
)
def update_plot(
selected_channels: list[str],
cfc_value: str,
show_resultant: bool,
y_align: bool,
n_clicks: int | None,
cursor_x1: float | None,
cursor_x2: float | None,
) -> go.Figure:
if not selected_channels:
return go.Figure().update_layout(
template="plotly_white",
annotations=[
{
"text": "Select channels from the left panel",
"xref": "paper",
"yref": "paper",
"x": 0.5,
"y": 0.5,
"showarrow": False,
"font": {"size": 16, "color": "#999"},
}
],
)
fig = go.Figure()
# Process channels
channels_to_plot: list[tuple[str, Channel]] = []
for i, name in enumerate(selected_channels):
try:
ch = test_data.get(name)
except KeyError:
continue
# Apply transforms
if cfc_value != "none":
try:
ch = CFCFilter(cfc_class=int(cfc_value)).apply(ch)
except (ValueError, Exception):
pass
if y_align:
ch = YAlign().apply(ch)
label = ch.code.short_label if ch.code.is_valid else ch.name
channels_to_plot.append((label, ch))
# Compute resultant if requested
if show_resultant and len(channels_to_plot) >= 2:
# Group by group_key and compute resultants
from collections import defaultdict
groups: dict[str, list[Channel]] = defaultdict(list)
for _, ch in channels_to_plot:
if ch.code.is_valid and ch.code.is_component():
groups[ch.code.group_key()].append(ch)
for gkey, comps in groups.items():
if len(comps) >= 2:
from impakt.transform.resultant import resultant_from_channels
try:
res = resultant_from_channels(*comps)
channels_to_plot.append(("Resultant", res))
except Exception:
pass
# Add traces
for i, (label, ch) in enumerate(channels_to_plot):
color = DEFAULT_COLORS[i % len(DEFAULT_COLORS)]
fig.add_trace(
go.Scatter(
x=ch.time.tolist(),
y=ch.data.tolist(),
mode="lines",
name=label,
line=dict(color=color, width=1.5),
hovertemplate=f"{label}<br>t=%{{x:.6f}}s<br>%{{y:.4f}} {ch.unit}<extra></extra>",
)
)
# Add cursor lines
if cursor_x1 is not None and cursor_x2 is not None:
for x_val, lbl in [(cursor_x1, "x1"), (cursor_x2, "x2")]:
fig.add_vline(
x=x_val,
line_dash="dash",
line_color="gray",
line_width=1,
annotation_text=f"{lbl}={x_val:.4f}s",
)
# Layout
y_label = ""
if channels_to_plot:
y_label = channels_to_plot[0][1].unit
cfc_label = f" (CFC {cfc_value})" if cfc_value != "none" else ""
fig.update_layout(
title=f"Channel Plot{cfc_label}",
xaxis_title="Time (s)",
yaxis_title=y_label,
template="plotly_white",
hovermode="x unified",
legend=dict(orientation="h", yanchor="bottom", y=-0.25, xanchor="center", x=0.5),
margin=dict(l=60, r=20, t=40, b=80),
)
return fig
@app.callback(
Output("cursor-values-table", "children"),
[Input("cursor-update-btn", "n_clicks")],
[
State("channel-select", "value"),
State("cfc-select", "value"),
State("y-align-check", "value"),
State("cursor-x1", "value"),
State("cursor-x2", "value"),
],
)
def update_cursor_table(
n_clicks: int | None,
selected_channels: list[str],
cfc_value: str,
y_align: bool,
cursor_x1: float | None,
cursor_x2: float | None,
) -> Any:
if not n_clicks or not selected_channels:
return html.Div("Click 'Update' with channels selected", className="text-muted small")
if cursor_x1 is None or cursor_x2 is None:
return html.Div("Set both cursor positions", className="text-muted small")
rows = []
for name in selected_channels:
try:
ch = test_data.get(name)
if cfc_value != "none":
try:
ch = CFCFilter(cfc_class=int(cfc_value)).apply(ch)
except Exception:
pass
if y_align:
ch = YAlign().apply(ch)
v1 = float(np.interp(cursor_x1, ch.time, ch.data))
v2 = float(np.interp(cursor_x2, ch.time, ch.data))
delta = v2 - v1
label = ch.code.short_label if ch.code.is_valid else ch.name
rows.append(
html.Tr(
[
html.Td(label, style={"fontSize": "12px"}),
html.Td(
f"{v1:.4f}", style={"fontSize": "12px", "fontFamily": "monospace"}
),
html.Td(
f"{v2:.4f}", style={"fontSize": "12px", "fontFamily": "monospace"}
),
html.Td(
f"{delta:+.4f}",
style={"fontSize": "12px", "fontFamily": "monospace"},
),
html.Td(ch.unit, style={"fontSize": "12px"}),
]
)
)
except Exception:
continue
if not rows:
return html.Div("No values computed", className="text-muted small")
return html.Table(
[
html.Thead(
html.Tr(
[
html.Th("Channel", style={"fontSize": "11px"}),
html.Th(f"@ {cursor_x1:.4f}s", style={"fontSize": "11px"}),
html.Th(f"@ {cursor_x2:.4f}s", style={"fontSize": "11px"}),
html.Th("Delta", style={"fontSize": "11px"}),
html.Th("Unit", style={"fontSize": "11px"}),
]
)
),
html.Tbody(rows),
],
className="table table-sm table-hover",
style={"marginBottom": 0},
)
@app.callback(
Output("channel-select", "options"),
[Input("channel-search", "value")],
)
def filter_channels(search: str | None) -> list[dict[str, str]]:
tree_items = []
for ch in test_data:
label = ch.code.short_label if ch.code.is_valid else ch.name
tree_items.append({"label": label, "value": ch.name})
if not search:
return tree_items
search_lower = search.lower()
return [
item
for item in tree_items
if search_lower in item["label"].lower() or search_lower in item["value"].lower()
]

View File

@@ -0,0 +1,28 @@
"""Dash callback registrations for the Impakt web UI.
This package contains feature-specific callback modules. The
register_callbacks() function here is the single entry point
that registers all of them.
"""
from __future__ import annotations
import dash
from impakt.web.callbacks.channel_callbacks import register_channel_callbacks
from impakt.web.callbacks.criteria_callbacks import register_criteria_callbacks
from impakt.web.callbacks.cursor_callbacks import register_cursor_callbacks
from impakt.web.callbacks.export_callbacks import register_export_callbacks
from impakt.web.callbacks.file_callbacks import register_file_callbacks
from impakt.web.callbacks.plot_callbacks import register_plot_callbacks
from impakt.web.state import AppState
def register_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register all callbacks from all feature modules."""
register_channel_callbacks(app, app_state)
register_plot_callbacks(app, app_state)
register_cursor_callbacks(app, app_state)
register_criteria_callbacks(app, app_state)
register_file_callbacks(app, app_state)
register_export_callbacks(app, app_state)

View File

@@ -0,0 +1,102 @@
"""Channel selection and filtering callbacks.
Handles:
- DataTable row selection -> updates selected channels store
- Wildcard filter + facet dropdowns -> filters table rows
- Selected channels badge display
- Filter clear button
"""
from __future__ import annotations
from typing import Any
import dash
from dash import Input, Output, State, html, no_update
from dash.exceptions import PreventUpdate
from impakt.web.components.channel_grid import (
build_selected_channels_badges,
filter_rows,
)
from impakt.web.state import AppState
def register_channel_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register all channel selection and filtering callbacks."""
@app.callback(
Output("selected-channels-store", "data"),
[Input("channel-grid", "selected_rows")],
[State("channel-grid", "data")],
)
def sync_selection_to_store(
selected_row_indices: list[int] | None,
current_data: list[dict[str, Any]] | None,
) -> list[str]:
"""When user checks/unchecks rows in the DataTable, update the store."""
if not selected_row_indices or not current_data:
return []
keys = []
for idx in selected_row_indices:
if 0 <= idx < len(current_data):
keys.append(current_data[idx]["key"])
return keys
@app.callback(
Output("selected-channels-badges", "children"),
[Input("selected-channels-store", "data")],
)
def update_badges(selected_keys: list[str] | None) -> list:
return build_selected_channels_badges(selected_keys or [], app_state)
@app.callback(
Output("channel-grid", "data"),
[
Input("channel-filter-input", "value"),
Input("channel-filter-clear", "n_clicks"),
Input("facet-body", "value"),
Input("facet-meas", "value"),
Input("facet-direction", "value"),
],
[State("channel-grid-all-rows", "data")],
)
def apply_filters(
pattern: str | None,
clear_clicks: int | None,
body: str | None,
meas: str | None,
direction: str | None,
all_rows: list[dict[str, Any]] | None,
) -> list[dict[str, Any]]:
"""Filter the channel grid based on wildcard pattern and facets."""
if not all_rows:
return []
# If clear was clicked, return all rows
trigger = dash.ctx.triggered_id
if trigger == "channel-filter-clear":
return all_rows
return filter_rows(
all_rows,
pattern=pattern or "",
body=body or "",
meas=meas or "",
direction=direction or "",
)
@app.callback(
[
Output("channel-filter-input", "value"),
Output("facet-body", "value"),
Output("facet-meas", "value"),
Output("facet-direction", "value"),
],
[Input("channel-filter-clear", "n_clicks")],
prevent_initial_call=True,
)
def clear_filters(n_clicks: int | None) -> tuple[str, str, str, str]:
"""Clear all filter inputs."""
return "", "", "", ""

View File

@@ -0,0 +1,57 @@
"""Criteria computation callbacks.
Handles:
- Compute All button -> runs auto_compute_criteria
- Protocol scoring
- Results display
"""
from __future__ import annotations
from typing import Any
import dash
from dash import Input, Output, State, html
from dash.exceptions import PreventUpdate
from impakt.web.components.criteria import (
auto_compute_criteria,
build_criteria_results_display,
score_protocol,
)
from impakt.web.state import AppState
def register_criteria_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register criteria-related callbacks."""
@app.callback(
Output("criteria-results", "children"),
[Input("compute-criteria-btn", "n_clicks")],
[State("protocol-select", "value")],
)
def compute_criteria(n_clicks: int | None, protocol: str) -> Any:
if not n_clicks:
return html.Div(
"Click 'Compute All' to auto-detect channels and calculate injury criteria.",
className="text-muted small",
)
primary = app_state.primary_test
if primary is None:
return html.Div("No test loaded.", className="text-danger small")
# Compute criteria
criteria = auto_compute_criteria(primary.data)
if not criteria:
return html.Div(
"No criteria could be computed. Check that the test has the required channels "
"(head acceleration, neck force/moment, chest deflection, femur load).",
className="text-warning small",
)
# Score against protocol
protocol_result = score_protocol(criteria, protocol)
return build_criteria_results_display(criteria, protocol_result)

View File

@@ -0,0 +1,68 @@
"""Cursor value callbacks.
Handles:
- Cursor update button -> recompute interpolated values
- Cursor values table rendering
"""
from __future__ import annotations
from typing import Any
import dash
from dash import Input, Output, State, html
from dash.exceptions import PreventUpdate
from impakt.web.callbacks.plot_callbacks import _resolve_channels
from impakt.web.components.cursors import build_cursor_values_table
from impakt.web.state import AppState
def register_cursor_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register cursor-related callbacks."""
@app.callback(
Output("cursor-values-table", "children"),
[Input("cursor-update-btn", "n_clicks")],
[
State("selected-channels-store", "data"),
State("cfc-select", "value"),
State("y-align-check", "value"),
State("x-align-method", "value"),
State("x-align-value", "value"),
State("show-resultant", "value"),
State("cursor-x1", "value"),
State("cursor-x2", "value"),
],
)
def update_cursor_table(
n_clicks: int | None,
selected_keys: list[str] | None,
cfc_value: str,
y_align: bool,
x_align_method: str,
x_align_value: float | None,
show_resultant: bool,
cursor_x1: float | None,
cursor_x2: float | None,
) -> Any:
if not n_clicks:
return html.Div("Select channels and click 'Update'", className="text-muted small")
if cursor_x1 is None or cursor_x2 is None:
return html.Div("Set both cursor positions", className="text-muted small")
if not selected_keys:
return html.Div("No channels selected", className="text-muted small")
channels = _resolve_channels(
selected_keys,
app_state,
cfc_value,
y_align,
x_align_method or "none",
x_align_value,
show_resultant,
)
return build_cursor_values_table(channels, cursor_x1, cursor_x2)

View File

@@ -0,0 +1,136 @@
"""Export callbacks.
Handles:
- Plot image export (PNG, SVG, PDF)
- Data CSV export
- Protocol report generation
"""
from __future__ import annotations
import io
import tempfile
from typing import Any
import dash
import numpy as np
import pandas as pd
from dash import Input, Output, State, dcc, html, no_update
from dash.exceptions import PreventUpdate
from impakt.web.callbacks.plot_callbacks import _resolve_channels
from impakt.web.state import AppState
def register_export_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register export-related callbacks."""
@app.callback(
Output("download-export", "data"),
[
Input("export-csv-btn", "n_clicks"),
],
[
State("selected-channels-store", "data"),
State("cfc-select", "value"),
State("y-align-check", "value"),
State("x-align-method", "value"),
State("x-align-value", "value"),
State("show-resultant", "value"),
],
prevent_initial_call=True,
)
def export_csv(
n_clicks: int | None,
selected_keys: list[str] | None,
cfc_value: str,
y_align: bool,
x_align_method: str,
x_align_value: float | None,
show_resultant: bool,
) -> Any:
if not n_clicks or not selected_keys:
raise PreventUpdate
channels = _resolve_channels(
selected_keys,
app_state,
cfc_value,
y_align,
x_align_method or "none",
x_align_value,
show_resultant,
)
if not channels:
raise PreventUpdate
# Build DataFrame: first column is time from the first channel,
# then one column per channel
ref_time = channels[0][1].time
data: dict[str, Any] = {"time_s": ref_time}
for label, ch in channels:
# Interpolate to common time base if needed
if len(ch.time) == len(ref_time) and np.allclose(ch.time, ref_time):
data[f"{label} [{ch.unit}]"] = ch.data
else:
data[f"{label} [{ch.unit}]"] = np.interp(ref_time, ch.time, ch.data)
df = pd.DataFrame(data)
# Generate filename
primary = app_state.primary_test
test_id = primary.test_id if primary else "export"
cfc_suffix = f"_CFC{cfc_value}" if cfc_value != "none" else ""
return dcc.send_data_frame(
df.to_csv,
f"{test_id}{cfc_suffix}_channels.csv",
index=False,
)
@app.callback(
Output("report-status", "children"),
[Input("generate-report-btn", "n_clicks")],
[State("protocol-select", "value")],
prevent_initial_call=True,
)
def generate_report(n_clicks: int | None, protocol: str) -> Any:
if not n_clicks:
raise PreventUpdate
primary = app_state.primary_test
if primary is None:
return html.Div("No test loaded.", className="text-danger small")
from impakt.web.components.criteria import auto_compute_criteria, score_protocol
criteria = auto_compute_criteria(primary.data)
if not criteria:
return html.Div("No criteria to report.", className="text-warning small")
protocol_result = score_protocol(criteria, protocol)
if protocol_result is None:
return html.Div("Protocol scoring failed.", className="text-danger small")
# Generate report
try:
from impakt.report.engine import generate_protocol_report
import tempfile
with tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w") as f:
from impakt.report.engine import _fallback_protocol_html
html_content = _fallback_protocol_html(protocol_result, primary.data.metadata)
f.write(html_content)
report_path = f.name
return html.Div(
[
html.Span("Report generated: ", className="small text-success"),
html.Code(report_path, style={"fontSize": "10px"}),
]
)
except Exception as e:
return html.Div(f"Report generation failed: {e}", className="text-danger small")

View File

@@ -0,0 +1,120 @@
"""File/test management callbacks.
Handles:
- Open test modal
- Add overlay modal
- Test loading
- Page refresh after loading
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import dash
from dash import Input, Output, State, html, no_update
from dash.exceptions import PreventUpdate
from impakt.web.state import AppState
def register_file_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register file management callbacks."""
@app.callback(
Output("open-test-modal", "is_open"),
[
Input("open-test-btn", "n_clicks"),
Input("open-test-cancel", "n_clicks"),
Input("open-test-confirm", "n_clicks"),
],
[State("open-test-modal", "is_open")],
)
def toggle_open_modal(
open_clicks: int | None,
cancel_clicks: int | None,
confirm_clicks: int | None,
is_open: bool,
) -> bool:
trigger = dash.ctx.triggered_id
if trigger == "open-test-btn":
return True
elif trigger in ("open-test-cancel", "open-test-confirm"):
return False
return is_open
@app.callback(
Output("overlay-test-modal", "is_open"),
[
Input("add-overlay-btn", "n_clicks"),
Input("overlay-test-cancel", "n_clicks"),
Input("overlay-test-confirm", "n_clicks"),
],
[State("overlay-test-modal", "is_open")],
)
def toggle_overlay_modal(
open_clicks: int | None,
cancel_clicks: int | None,
confirm_clicks: int | None,
is_open: bool,
) -> bool:
trigger = dash.ctx.triggered_id
if trigger == "add-overlay-btn":
return True
elif trigger in ("overlay-test-cancel", "overlay-test-confirm"):
return False
return is_open
@app.callback(
[
Output("open-test-error", "children"),
Output("page-refresh-trigger", "data"),
],
[Input("open-test-confirm", "n_clicks")],
[State("open-test-path", "value")],
)
def load_test(n_clicks: int | None, path_str: str | None) -> tuple[str, Any]:
if not n_clicks or not path_str:
raise PreventUpdate
path = Path(path_str.strip()).expanduser().resolve()
if not path.exists():
return f"Path does not exist: {path}", no_update
if not path.is_dir():
return f"Path is not a directory: {path}", no_update
try:
# Clear existing tests and load new one
for tid in list(app_state.test_ids):
app_state.remove_test(tid)
app_state.load_test(path)
return "", {"action": "reload"}
except Exception as e:
return f"Failed to load: {e}", no_update
@app.callback(
[
Output("overlay-test-error", "children"),
Output("page-refresh-trigger", "data", allow_duplicate=True),
],
[Input("overlay-test-confirm", "n_clicks")],
[State("overlay-test-path", "value")],
prevent_initial_call=True,
)
def load_overlay(n_clicks: int | None, path_str: str | None) -> tuple[str, Any]:
if not n_clicks or not path_str:
raise PreventUpdate
path = Path(path_str.strip()).expanduser().resolve()
if not path.exists():
return f"Path does not exist: {path}", no_update
try:
app_state.load_test(path)
return "", {"action": "reload"}
except Exception as e:
return f"Failed to load: {e}", no_update

View File

@@ -0,0 +1,224 @@
"""Plot rendering callbacks.
Handles:
- Updating plot figures when channels/transforms change
- Cursor line rendering
- Layout switching
"""
from __future__ import annotations
from collections import defaultdict
from typing import Any
import dash
import numpy as np
import plotly.graph_objects as go
from dash import ALL, Input, Output, State, ctx
from dash.exceptions import PreventUpdate
from impakt.channel.model import Channel
from impakt.plot.engine import DEFAULT_COLORS
from impakt.transform.align import XAlign, YAlign
from impakt.transform.cfc import CFCFilter
from impakt.transform.resultant import resultant_from_channels
from impakt.web.state import AppState
def _resolve_channels(
selected_keys: list[str],
app_state: AppState,
cfc_value: str,
y_align: bool,
x_align_method: str,
x_align_value: float | None,
show_resultant: bool,
) -> list[tuple[str, Channel]]:
"""Resolve selected channel keys to Channel objects with transforms applied."""
channels: list[tuple[str, Channel]] = []
for key in selected_keys:
if "::" in key:
test_id, ch_name = key.split("::", 1)
elif app_state.primary_test:
test_id = app_state.primary_test.test_id
ch_name = key
else:
continue
ch = app_state.get_channel(test_id, ch_name)
if ch is None:
continue
# Apply CFC filter
if cfc_value != "none":
try:
ch = CFCFilter(cfc_class=int(cfc_value)).apply(ch)
except (ValueError, Exception):
pass
# Apply Y-align
if y_align:
ch = YAlign().apply(ch)
# Apply X-align
if x_align_method == "manual" and x_align_value is not None:
ch = XAlign(method="manual", reference_time=x_align_value).apply(ch)
elif x_align_method == "threshold" and x_align_value is not None:
ch = XAlign(method="threshold", threshold_value=x_align_value).apply(ch)
# Build label
multi_test = len(app_state.tests) > 1
label = ch.code.short_label if ch.code.is_valid else ch.name
if multi_test:
label = f"[{test_id}] {label}"
channels.append((label, ch))
# Compute resultants if requested
if show_resultant and len(channels) >= 2:
groups: dict[str, list[tuple[str, Channel]]] = defaultdict(list)
for label, ch in channels:
if ch.code.is_valid and ch.code.is_component():
groups[ch.code.group_key()].append((label, ch))
for gkey, group_channels in groups.items():
if len(group_channels) >= 2:
try:
comps = [ch for _, ch in group_channels]
res = resultant_from_channels(*comps)
res_label = (
f"{res.code.location_label} Resultant" if res.code.is_valid else "Resultant"
)
if len(app_state.tests) > 1:
res_label = f"[{comps[0].source_test_id}] {res_label}"
channels.append((res_label, res))
except Exception:
pass
return channels
def _build_figure(
channels: list[tuple[str, Channel]],
cursor_x1: float | None,
cursor_x2: float | None,
cfc_value: str,
) -> go.Figure:
"""Build a Plotly figure from resolved channels."""
fig = go.Figure()
if not channels:
fig.update_layout(
template="plotly_white",
annotations=[
{
"text": "Select channels from the tree to plot",
"xref": "paper",
"yref": "paper",
"x": 0.5,
"y": 0.5,
"showarrow": False,
"font": {"size": 14, "color": "#bbb"},
}
],
xaxis={"visible": False},
yaxis={"visible": False},
margin={"l": 40, "r": 20, "t": 30, "b": 40},
)
return fig
# Add traces
for i, (label, ch) in enumerate(channels):
color = DEFAULT_COLORS[i % len(DEFAULT_COLORS)]
fig.add_trace(
go.Scatter(
x=ch.time.tolist(),
y=ch.data.tolist(),
mode="lines",
name=label,
line=dict(color=color, width=1.5),
hovertemplate=f"{label}<br>t=%{{x:.6f}}s<br>%{{y:.4f}} {ch.unit}<extra></extra>",
)
)
# Add cursor lines
if cursor_x1 is not None and cursor_x2 is not None:
for x_val, lbl in [(cursor_x1, "x1"), (cursor_x2, "x2")]:
fig.add_vline(
x=x_val,
line_dash="dash",
line_color="rgba(100,100,100,0.5)",
line_width=1,
annotation_text=f"{lbl}={x_val:.4f}s",
annotation_font_size=10,
)
# Layout
y_label = channels[0][1].unit if channels else ""
cfc_label = f" (CFC {cfc_value})" if cfc_value != "none" else ""
fig.update_layout(
xaxis_title="Time (s)",
yaxis_title=y_label,
template="plotly_white",
hovermode="x unified",
showlegend=True,
legend=dict(
orientation="h",
yanchor="bottom",
y=-0.2,
xanchor="center",
x=0.5,
font=dict(size=10),
),
margin=dict(l=55, r=15, t=10, b=60),
)
return fig
def register_plot_callbacks(app: dash.Dash, app_state: AppState) -> None:
"""Register all plot-related callbacks."""
@app.callback(
Output({"type": "plot-graph", "index": 0}, "figure"),
[
Input("selected-channels-store", "data"),
Input("cfc-select", "value"),
Input("show-resultant", "value"),
Input("y-align-check", "value"),
Input("x-align-method", "value"),
Input("cursor-update-btn", "n_clicks"),
],
[
State("cursor-x1", "value"),
State("cursor-x2", "value"),
State("x-align-value", "value"),
],
)
def update_main_plot(
selected_keys: list[str] | None,
cfc_value: str,
show_resultant: bool,
y_align: bool,
x_align_method: str,
n_clicks: int | None,
cursor_x1: float | None,
cursor_x2: float | None,
x_align_value: float | None,
) -> go.Figure:
if not selected_keys:
selected_keys = []
channels = _resolve_channels(
selected_keys,
app_state,
cfc_value,
y_align,
x_align_method or "none",
x_align_value,
show_resultant,
)
return _build_figure(channels, cursor_x1, cursor_x2, cfc_value)

View File

@@ -0,0 +1 @@
"""Reusable Dash layout components for the Impakt web UI."""

View File

@@ -0,0 +1,375 @@
"""Flat channel grid component.
A compact, sortable data table of all channels with:
- Wildcard filter bar (ISO code pattern matching)
- Facet dropdowns (body region, measurement type, direction)
- Checkbox selection with color dots matching plot traces
- Columns: selected, ISO code, description, unit, min, max
Channels are displayed in file load order (preserving the user's
original channel ordering). Filtering reduces the visible set
without reordering.
"""
from __future__ import annotations
import fnmatch
import re
from typing import Any
import dash_bootstrap_components as dbc
from dash import dash_table, dcc, html
from impakt.channel.lookup import DIRECTIONS, MAIN_LOCATIONS, MEASUREMENTS
from impakt.plot.engine import DEFAULT_COLORS
from impakt.web.state import AppState
def _build_channel_rows(app_state: AppState) -> list[dict[str, Any]]:
"""Build flat row data for the channel grid.
Each row is a dict with fields for the DataTable. Channels appear
in their original file order.
"""
rows: list[dict[str, Any]] = []
multi_test = len(app_state.tests) > 1
for loaded in app_state.tests:
test_id = loaded.test_id
ch_num = 0
for ch in loaded.data:
ch_num += 1
code = ch.code
# Build human-readable description
if code.is_valid:
desc = code.short_label
body = MAIN_LOCATIONS.get(code.main_location, code.main_location)
meas = code.measurement
meas_label = MEASUREMENTS.get(meas, (meas,))[0] if meas in MEASUREMENTS else meas
direction = code.direction
occupant = code.test_object
else:
desc = ch.name
body = ""
meas = ""
meas_label = ""
direction = ""
occupant = ""
rows.append(
{
"key": f"{test_id}::{ch.name}",
"ch_num": ch_num,
"test_id": test_id if multi_test else "",
"iso_code": ch.name,
"description": desc,
"unit": ch.unit,
"min": f"{float(ch.data.min()):.2f}",
"max": f"{float(ch.data.max()):.2f}",
"rate": f"{ch.sample_rate:.0f}",
# Facet fields (for filtering, not necessarily displayed)
"body": body,
"meas": meas_label,
"direction": direction,
"occupant": occupant,
}
)
return rows
def _extract_facet_options(rows: list[dict[str, Any]], field: str) -> list[dict[str, str]]:
"""Extract unique values for a facet dropdown."""
values = sorted({r[field] for r in rows if r[field]})
return [{"label": v, "value": v} for v in values]
def build_channel_grid(app_state: AppState) -> dbc.Card:
"""Build the channel grid panel."""
if app_state.is_empty:
return dbc.Card(
[
dbc.CardHeader(
[
html.Span("Channels", className="fw-bold"),
],
className="py-2",
),
dbc.CardBody(
html.Div("No test loaded", className="text-muted small text-center py-3"),
),
]
)
rows = _build_channel_rows(app_state)
multi_test = len(app_state.tests) > 1
# Facet options
body_options = _extract_facet_options(rows, "body")
meas_options = _extract_facet_options(rows, "meas")
dir_options = _extract_facet_options(rows, "direction")
# Columns for the DataTable
columns = [
{"name": "#", "id": "ch_num", "type": "numeric"},
{"name": "ISO Code", "id": "iso_code"},
{"name": "Description", "id": "description"},
{"name": "Unit", "id": "unit"},
{"name": "Min", "id": "min", "type": "numeric"},
{"name": "Max", "id": "max", "type": "numeric"},
]
if multi_test:
columns.insert(1, {"name": "Test", "id": "test_id"})
return dbc.Card(
[
dbc.CardHeader(
[
html.Span("Channels", className="fw-bold"),
html.Span(f" ({len(rows)})", style={"fontSize": "11px", "color": "#999"}),
],
className="py-2",
),
dbc.CardBody(
[
# Wildcard filter bar
dbc.InputGroup(
[
dbc.Input(
id="channel-filter-input",
placeholder="Filter: *HEAD*AC*, 11*FO*Z*, ...",
type="text",
size="sm",
debounce=True,
style={"fontSize": "12px", "fontFamily": "monospace"},
),
dbc.Button(
"Clear",
id="channel-filter-clear",
color="outline-secondary",
size="sm",
style={"fontSize": "11px"},
),
],
size="sm",
className="mb-2",
),
# Facet dropdowns
dbc.Row(
[
dbc.Col(
[
dbc.Select(
id="facet-body",
options=[{"label": "Body region...", "value": ""}]
+ body_options,
value="",
size="sm",
style={"fontSize": "11px"},
),
],
width=4,
),
dbc.Col(
[
dbc.Select(
id="facet-meas",
options=[{"label": "Measurement...", "value": ""}]
+ meas_options,
value="",
size="sm",
style={"fontSize": "11px"},
),
],
width=4,
),
dbc.Col(
[
dbc.Select(
id="facet-direction",
options=[{"label": "Dir...", "value": ""}] + dir_options,
value="",
size="sm",
style={"fontSize": "11px"},
),
],
width=4,
),
],
className="mb-2",
),
# Selected channels badge row
html.Div(id="selected-channels-badges", className="mb-2"),
# Channel DataTable
html.Div(
dash_table.DataTable(
id="channel-grid",
columns=columns,
data=rows,
row_selectable="multi",
selected_rows=[],
sort_action="native",
sort_mode="multi",
page_action="none",
style_table={
"overflowY": "auto",
"maxHeight": "400px",
},
style_header={
"backgroundColor": "#f8f9fa",
"fontWeight": "bold",
"fontSize": "11px",
"padding": "4px 8px",
"borderBottom": "2px solid #dee2e6",
},
style_cell={
"fontSize": "11px",
"fontFamily": "'SF Mono', 'Menlo', 'Monaco', monospace",
"padding": "3px 8px",
"textAlign": "left",
"border": "none",
"borderBottom": "1px solid #f0f0f0",
"overflow": "hidden",
"textOverflow": "ellipsis",
"whiteSpace": "nowrap",
},
style_cell_conditional=[
{
"if": {"column_id": "ch_num"},
"width": "36px",
"textAlign": "right",
"color": "#999",
},
{
"if": {"column_id": "iso_code"},
"minWidth": "140px",
},
{
"if": {"column_id": "description"},
"minWidth": "80px",
"fontFamily": "inherit",
},
{
"if": {"column_id": "unit"},
"width": "50px",
"textAlign": "center",
},
{"if": {"column_id": "min"}, "width": "65px", "textAlign": "right"},
{"if": {"column_id": "max"}, "width": "65px", "textAlign": "right"},
{"if": {"column_id": "test_id"}, "width": "60px"},
],
style_data_conditional=[
{
"if": {"state": "selected"},
"backgroundColor": "#e8f4fd",
"border": "none",
"borderBottom": "1px solid #b8daff",
},
],
style_as_list_view=True,
fixed_rows={"headers": True},
),
),
# Hidden store for full row data (used by callbacks to resolve keys)
dcc.Store(id="channel-grid-all-rows", data=rows),
],
className="py-2",
),
]
)
def build_selected_channels_badges(
selected_keys: list[str],
app_state: AppState,
) -> list:
"""Build badge pills showing currently selected channels."""
if not selected_keys:
return [
html.Span("No channels selected", className="text-muted", style={"fontSize": "11px"})
]
badges = []
for i, key in enumerate(selected_keys[:15]):
# Resolve label
if "::" in key:
test_id, ch_name = key.split("::", 1)
ch = app_state.get_channel(test_id, ch_name)
else:
ch = None
ch_name = key
if ch and ch.code.is_valid:
label = ch.code.short_label
else:
label = ch_name
if len(app_state.tests) > 1 and "::" in key:
label = f"{key.split('::')[0]}: {label}"
# Color dot matching plot trace
color = DEFAULT_COLORS[i % len(DEFAULT_COLORS)]
badges.append(
html.Span(
[
html.Span("\u25cf ", style={"color": color, "fontSize": "10px"}),
html.Span(label, style={"fontSize": "10px"}),
],
className="badge bg-light text-dark border me-1 mb-1",
style={"fontWeight": "normal"},
),
)
if len(selected_keys) > 15:
badges.append(
html.Span(
f"+{len(selected_keys) - 15} more",
className="text-muted",
style={"fontSize": "10px"},
)
)
return badges
def filter_rows(
all_rows: list[dict[str, Any]],
pattern: str = "",
body: str = "",
meas: str = "",
direction: str = "",
) -> list[dict[str, Any]]:
"""Filter channel rows by wildcard pattern and facets.
Args:
all_rows: Full list of channel rows.
pattern: Wildcard pattern matched against ISO code (e.g., '*HEAD*AC*').
body: Body region facet filter.
meas: Measurement type facet filter.
direction: Direction facet filter.
Returns:
Filtered list of rows.
"""
result = all_rows
if pattern:
# Normalize: if the user didn't use wildcards, wrap in *...*
p = pattern.strip().upper()
if "*" not in p and "?" not in p:
p = f"*{p}*"
result = [r for r in result if fnmatch.fnmatch(r["iso_code"].upper(), p)]
if body:
result = [r for r in result if r["body"] == body]
if meas:
result = [r for r in result if r["meas"] == meas]
if direction:
result = [r for r in result if r["direction"] == direction]
return result

View File

@@ -0,0 +1,221 @@
"""Collapsible channel tree component.
Renders channels in a hierarchical accordion:
Test Object > Body Region > Measurement Type > [channels]
Supports search filtering, select-all per group, and channel preview
(peak, unit, sample rate) on hover/expand.
"""
from __future__ import annotations
from typing import Any
import dash_bootstrap_components as dbc
from dash import dcc, html
from impakt.web.state import AppState
def _make_channel_item(ch_info: dict[str, str], show_test_prefix: bool = False) -> html.Div:
"""Build a single channel item with checkbox and preview info."""
label = ch_info["label"]
key = ch_info["key"]
peak = ch_info.get("peak", "")
unit = ch_info.get("unit", "")
preview = f" ({peak} {unit})" if peak else ""
return html.Div(
[
dbc.Checkbox(
id={"type": "channel-check", "index": key},
label=html.Span(
[
html.Span(label, style={"fontSize": "12px"}),
html.Span(
preview,
style={"fontSize": "10px", "color": "#999", "marginLeft": "4px"},
),
]
),
value=False,
style={"marginBottom": "1px"},
),
],
className="channel-item",
)
def _make_group_header(
group_label: str,
group_id: str,
channel_keys: list[str],
) -> html.Div:
"""Build a measurement group header with select-all button."""
return html.Div(
[
html.Span(group_label, style={"fontSize": "12px", "fontWeight": "500"}),
dbc.Button(
"All",
id={"type": "select-group-btn", "index": group_id},
color="link",
size="sm",
style={"fontSize": "10px", "padding": "0 4px", "textDecoration": "none"},
),
# Hidden store for this group's channel keys
dcc.Store(
id={"type": "group-channels", "index": group_id},
data=channel_keys,
),
],
className="d-flex align-items-center justify-content-between",
)
def build_channel_tree(app_state: AppState) -> dbc.Card:
"""Build the full channel tree panel with search and accordion."""
if app_state.is_empty:
return dbc.Card(
[
dbc.CardHeader("Channels", className="fw-bold py-2"),
dbc.CardBody(
html.Div("No test loaded", className="text-muted small text-center py-3"),
),
]
)
multi_test = len(app_state.tests) > 1
full_tree = app_state.build_channel_tree()
# Build accordion items
accordion_items = []
group_counter = 0
for test_id, test_tree in full_tree.items():
test_label = ""
loaded = app_state.get_test(test_id)
if loaded and multi_test:
test_label = f"[{test_id}] "
for obj_label, locations in test_tree.items():
# Top-level accordion: test object (Driver, Vehicle Structure, etc.)
obj_content = []
for loc_label, measurements in locations.items():
for meas_label, channels in measurements.items():
group_id = f"grp_{group_counter}"
group_counter += 1
channel_keys = [ch["key"] for ch in channels]
header_label = (
f"{loc_label}{meas_label}" if loc_label != meas_label else meas_label
)
obj_content.append(_make_group_header(header_label, group_id, channel_keys))
for ch in channels:
obj_content.append(_make_channel_item(ch, show_test_prefix=multi_test))
obj_content.append(html.Hr(style={"margin": "4px 0"}))
if obj_content:
# Remove trailing hr
if obj_content and isinstance(obj_content[-1], html.Hr):
obj_content = obj_content[:-1]
display_label = f"{test_label}{obj_label}" if test_label else obj_label
accordion_items.append(
dbc.AccordionItem(
html.Div(obj_content),
title=display_label,
style={"fontSize": "12px"},
)
)
return dbc.Card(
[
dbc.CardHeader(
[
html.Span("Channels", className="fw-bold"),
html.Span(
f" ({app_state.total_channels})",
style={"fontSize": "11px", "color": "#999"},
),
],
className="py-2",
),
dbc.CardBody(
[
# Search
dbc.Input(
id="channel-search",
placeholder="Search channels...",
type="text",
size="sm",
className="mb-2",
debounce=True,
),
# Selected channels display
html.Div(id="selected-channels-badges", className="mb-2"),
# Tree
html.Div(
dbc.Accordion(
accordion_items,
flush=True,
always_open=True,
start_collapsed=True,
id="channel-accordion",
),
style={"maxHeight": "450px", "overflowY": "auto"},
id="channel-tree-container",
),
],
className="py-2",
),
]
)
def build_selected_channels_badges(selected_keys: list[str], app_state: AppState) -> list:
"""Build badge pills showing currently selected channels."""
if not selected_keys:
return [
html.Span("No channels selected", className="text-muted", style={"fontSize": "11px"})
]
badges = []
for key in selected_keys[:20]: # Limit display to 20
# Extract label
if "::" in key:
test_id, ch_name = key.split("::", 1)
ch = app_state.get_channel(test_id, ch_name)
if ch and ch.code.is_valid:
label = ch.code.short_label
else:
label = ch_name
if len(app_state.tests) > 1:
label = f"{test_id}: {label}"
else:
label = key
badges.append(
dbc.Badge(
label,
id={"type": "channel-badge", "index": key},
color="primary",
pill=True,
className="me-1 mb-1",
style={"fontSize": "10px", "cursor": "pointer"},
)
)
if len(selected_keys) > 20:
badges.append(
html.Span(
f"+{len(selected_keys) - 20} more",
className="text-muted",
style={"fontSize": "10px"},
)
)
return badges

View File

@@ -0,0 +1,343 @@
"""Injury criteria panel component.
Provides the criteria computation button, auto-detection of channels,
results table with color-coded ratings, and protocol scoring.
"""
from __future__ import annotations
import logging
from typing import Any
import dash_bootstrap_components as dbc
from dash import html
from impakt.channel.model import Channel, TestData
from impakt.criteria import (
CriterionResult,
chest_deflection,
clip_3ms,
femur_load,
hic15,
nij,
tibia_index,
viscous_criterion,
)
from impakt.protocol.base import Color, ProtocolResult, Rating
logger = logging.getLogger(__name__)
def build_criteria_panel() -> dbc.Card:
"""Build the injury criteria panel."""
return dbc.Card(
[
dbc.CardHeader("Injury Criteria", className="fw-bold py-2"),
dbc.CardBody(
[
dbc.Button(
"Compute All",
id="compute-criteria-btn",
color="primary",
size="sm",
className="mb-2 w-100",
),
dbc.Select(
id="protocol-select",
options=[
{"label": "Euro NCAP 2024", "value": "euro_ncap"},
{"label": "US NCAP 2023", "value": "us_ncap"},
{"label": "IIHS 2024", "value": "iihs"},
],
value="euro_ncap",
size="sm",
className="mb-2",
),
html.Div(id="criteria-results"),
],
className="py-2",
),
],
className="mb-2",
)
def auto_compute_criteria(test_data: TestData) -> dict[str, CriterionResult]:
"""Auto-detect channels and compute all applicable injury criteria.
Uses ISO channel naming to find the right channels for each criterion.
"""
results: dict[str, CriterionResult] = {}
# --- HIC15 ---
head_accel = test_data.find("*HEAD0000*AC{X,Y,Z}*")
if not head_accel:
head_accel = test_data.find("*HEAD*AC*")
if len(head_accel) >= 2:
try:
from impakt.transform.resultant import resultant_from_channels
# Filter to only X/Y/Z components for the first test object found
first_obj = head_accel[0].code.test_object
comps = [
ch
for ch in head_accel
if ch.code.test_object == first_obj and ch.code.is_component()
]
if len(comps) >= 2:
results["HIC15"] = hic15(resultant_from_channels(*comps))
except Exception as e:
logger.warning("HIC15 computation failed: %s", e)
# --- 3ms Clip ---
chest_accel = test_data.find("*CHST0000*AC{X,Y,Z}*")
if not chest_accel:
chest_accel = test_data.find("*CHST*AC*")
if len(chest_accel) >= 2:
try:
from impakt.transform.resultant import resultant_from_channels
first_obj = chest_accel[0].code.test_object
comps = [
ch
for ch in chest_accel
if ch.code.test_object == first_obj and ch.code.is_component()
]
if len(comps) >= 2:
results["3ms Clip"] = clip_3ms(resultant_from_channels(*comps))
except Exception as e:
logger.warning("3ms Clip computation failed: %s", e)
# --- Nij ---
neck_fz = test_data.find("*NECKUP*FO*Z*")
neck_my = test_data.find("*NECKUP*MO*Y*")
if neck_fz and neck_my:
try:
results["Nij"] = nij(
fz_channel=neck_fz[0],
my_channel=neck_my[0],
dummy=test_data.metadata.dummy,
)
except Exception as e:
logger.warning("Nij computation failed: %s", e)
# --- Chest Deflection ---
# Look for DC (deflection/compression) channels — these are the actual chest
# deflection sensors (not DS which is general displacement and may be steering column)
chest_defl = [ch for ch in test_data.find("*CHST*DC*") if ch.code.measurement == "DC"]
# Only use DS as fallback if it looks like actual chest deflection
# (NHTSA DS channels are often steering column displacement, not chest defl)
if not chest_defl:
for ch in test_data.find("*CHST*DS*"):
if ch.code.measurement != "DS":
continue
# Chest deflection should be in mm range (0-100mm typical)
# If unit is 'm' and peak > 0.1m = 100mm, it's not chest deflection
peak_mm = ch.peak * 1000.0 if ch.unit in ("m",) else ch.peak
if peak_mm < 150.0:
chest_defl.append(ch)
if chest_defl:
try:
results["Chest Deflection"] = chest_deflection(channel=chest_defl[0])
except Exception as e:
logger.warning("Chest deflection computation failed: %s", e)
# --- Femur Load ---
femur_left = test_data.find("*FEMRLE*FO*Z*")
if femur_left:
try:
results["Femur Load Left"] = femur_load(channel=femur_left[0], side="left")
except Exception as e:
logger.warning("Femur left computation failed: %s", e)
femur_right = test_data.find("*FEMRRI*FO*Z*")
if femur_right:
try:
results["Femur Load Right"] = femur_load(channel=femur_right[0], side="right")
except Exception as e:
logger.warning("Femur right computation failed: %s", e)
# --- Tibia Index ---
tibia_fz = test_data.find("*TIBILI*FO*Z*")
if not tibia_fz:
tibia_fz = test_data.find("*TIBI*FO*Z*")
tibia_mx = test_data.find("*TIBILI*MO*X*")
if not tibia_mx:
tibia_mx = test_data.find("*TIBI*MO*X*")
tibia_my = test_data.find("*TIBILI*MO*Y*")
if not tibia_my:
tibia_my = test_data.find("*TIBI*MO*Y*")
if tibia_fz:
try:
results["Tibia Index"] = tibia_index(
fz_channel=tibia_fz[0],
mx_channel=tibia_mx[0] if tibia_mx else None,
my_channel=tibia_my[0] if tibia_my else None,
)
except Exception as e:
logger.warning("Tibia index computation failed: %s", e)
return results
def score_protocol(
criteria: dict[str, CriterionResult],
protocol: str = "euro_ncap",
) -> ProtocolResult | None:
"""Score criteria results against a protocol."""
try:
if protocol == "euro_ncap":
from impakt.protocol.euro_ncap import evaluate
return evaluate(criteria)
elif protocol == "us_ncap":
from impakt.protocol.us_ncap import evaluate
return evaluate(criteria)
elif protocol == "iihs":
from impakt.protocol.iihs import evaluate
return evaluate(criteria)
except Exception as e:
logger.warning("Protocol scoring failed: %s", e)
return None
def _rating_color(rating: Rating | None, color: Color | None) -> str:
"""Map a rating or color to a CSS color."""
if color:
return {
Color.GREEN: "#2ecc71",
Color.YELLOW: "#f1c40f",
Color.ORANGE: "#e67e22",
Color.BROWN: "#8B4513",
Color.RED: "#e74c3c",
}.get(color, "#bdc3c7")
if rating:
return {
Rating.GOOD: "#2ecc71",
Rating.ACCEPTABLE: "#f1c40f",
Rating.MARGINAL: "#e67e22",
Rating.POOR: "#e74c3c",
}.get(rating, "#bdc3c7")
return "#bdc3c7"
def build_criteria_results_display(
criteria: dict[str, CriterionResult],
protocol_result: ProtocolResult | None = None,
) -> html.Div:
"""Build the criteria results display."""
elements: list = []
# Protocol summary
if protocol_result:
stars_str = ""
if protocol_result.stars is not None:
stars_str = "" * protocol_result.stars + "" * (5 - protocol_result.stars)
elements.append(
html.Div(
[
html.Div(
[
html.Span(
protocol_result.protocol,
style={"fontWeight": "bold", "fontSize": "13px"},
),
html.Span(
f" {protocol_result.version}",
style={"fontSize": "11px", "color": "#999"},
),
]
),
html.Div(
stars_str,
style={"fontSize": "20px", "color": "#f1c40f", "letterSpacing": "2px"},
)
if stars_str
else None,
html.Div(
protocol_result.overall_rating,
style={"fontSize": "12px", "fontWeight": "bold"},
),
html.Hr(style={"margin": "6px 0"}),
]
)
)
# Criteria table
rows = []
for name, result in criteria.items():
# Find matching protocol region score for color
bg_color = "#fff"
if protocol_result:
for rs in protocol_result.region_scores:
if rs.criterion == name or name.lower() in rs.criterion.lower():
bg_color = _rating_color(rs.rating, rs.color)
break
time_str = f"{result.time_of_peak:.4f}s" if result.time_of_peak is not None else ""
rows.append(
html.Tr(
[
html.Td(result.criterion, style={"fontSize": "11px", "fontWeight": "500"}),
html.Td(
f"{result.value:.2f}",
style={
"fontSize": "11px",
"fontFamily": "monospace",
"fontWeight": "bold",
"textAlign": "right",
},
),
html.Td(result.unit, style={"fontSize": "11px"}),
html.Td(time_str, style={"fontSize": "10px", "color": "#999"}),
html.Td(
html.Div(
style={
"width": "12px",
"height": "12px",
"borderRadius": "50%",
"backgroundColor": bg_color,
"display": "inline-block",
}
),
),
],
id={"type": "criteria-row", "index": name},
style={"cursor": "pointer"},
)
)
elements.append(
html.Table(
[
html.Thead(
html.Tr(
[
html.Th("Criterion", style={"fontSize": "10px"}),
html.Th("Value", style={"fontSize": "10px", "textAlign": "right"}),
html.Th("", style={"fontSize": "10px"}),
html.Th("Peak", style={"fontSize": "10px"}),
html.Th("", style={"fontSize": "10px"}),
]
)
),
html.Tbody(rows),
],
className="table table-sm table-hover mb-0",
)
)
if not criteria:
elements.append(
html.Div(
"No criteria computed. Check that channels are available.",
className="text-muted small",
)
)
return html.Div(elements)

View File

@@ -0,0 +1,165 @@
"""Cursor controls and values table component.
Provides dual X-axis cursor inputs and a table showing interpolated
values at both cursor positions for all plotted channels.
"""
from __future__ import annotations
from typing import Any
import dash_bootstrap_components as dbc
from dash import html
from impakt.channel.model import Channel
def build_cursor_panel() -> dbc.Card:
"""Build the cursor controls and values display card."""
return dbc.Card(
[
dbc.CardHeader(
[
html.Span("X-Axis Cursors", className="fw-bold"),
],
className="py-2",
),
dbc.CardBody(
[
dbc.Row(
[
dbc.Col(
[
dbc.InputGroup(
[
dbc.InputGroupText("x1", style={"fontSize": "12px"}),
dbc.Input(
id="cursor-x1",
type="number",
step=0.001,
value=0.0,
size="sm",
style={"fontSize": "12px"},
),
dbc.InputGroupText("s", style={"fontSize": "12px"}),
],
size="sm",
),
],
width=5,
),
dbc.Col(
[
dbc.InputGroup(
[
dbc.InputGroupText("x2", style={"fontSize": "12px"}),
dbc.Input(
id="cursor-x2",
type="number",
step=0.001,
value=0.1,
size="sm",
style={"fontSize": "12px"},
),
dbc.InputGroupText("s", style={"fontSize": "12px"}),
],
size="sm",
),
],
width=5,
),
dbc.Col(
[
dbc.Button(
"Update",
id="cursor-update-btn",
color="primary",
size="sm",
className="w-100",
style={"fontSize": "12px"},
),
],
width=2,
),
],
className="mb-2",
),
html.Div(id="cursor-values-table"),
],
className="py-2",
),
]
)
def build_cursor_values_table(
channels: list[tuple[str, Channel]],
x1: float,
x2: float,
) -> html.Table | html.Div:
"""Build the cursor values table from resolved channels and positions.
Args:
channels: List of (label, channel) tuples.
x1: First cursor position.
x2: Second cursor position.
Returns:
An HTML table or a placeholder div.
"""
if not channels:
return html.Div("No channels plotted", className="text-muted small")
rows = []
for label, ch in channels:
v1 = ch.value_at(x1)
v2 = ch.value_at(x2)
delta = v2 - v1
rows.append(
html.Tr(
[
html.Td(
label,
style={
"fontSize": "11px",
"maxWidth": "180px",
"overflow": "hidden",
"textOverflow": "ellipsis",
"whiteSpace": "nowrap",
},
),
html.Td(
f"{v1:.4f}",
style={"fontSize": "11px", "fontFamily": "monospace", "textAlign": "right"},
),
html.Td(
f"{v2:.4f}",
style={"fontSize": "11px", "fontFamily": "monospace", "textAlign": "right"},
),
html.Td(
f"{delta:+.4f}",
style={"fontSize": "11px", "fontFamily": "monospace", "textAlign": "right"},
),
html.Td(ch.unit, style={"fontSize": "11px"}),
]
)
)
return html.Table(
[
html.Thead(
html.Tr(
[
html.Th("Channel", style={"fontSize": "10px"}),
html.Th(f"@ {x1:.4f}s", style={"fontSize": "10px", "textAlign": "right"}),
html.Th(f"@ {x2:.4f}s", style={"fontSize": "10px", "textAlign": "right"}),
html.Th("Delta", style={"fontSize": "10px", "textAlign": "right"}),
html.Th("Unit", style={"fontSize": "10px"}),
]
)
),
html.Tbody(rows),
],
className="table table-sm table-hover mb-0",
)

View File

@@ -0,0 +1,239 @@
"""Header/navbar component.
Contains the Impakt branding, test info summary, file open button,
and template selector.
"""
from __future__ import annotations
import dash_bootstrap_components as dbc
from dash import dcc, html
from impakt.web.state import AppState
def build_header(app_state: AppState, template_names: list[str] | None = None) -> dbc.Navbar:
"""Build the top navigation bar."""
# Test info summary
test_info = ""
if not app_state.is_empty:
primary = app_state.primary_test
if primary:
meta = primary.data.metadata
parts = [f"Test: {meta.test_number}"]
if meta.vehicle.make:
parts.append(f"{meta.vehicle.make} {meta.vehicle.model}".strip())
if meta.dummy.dummy_type:
parts.append(meta.dummy.dummy_type)
if meta.impact.test_type:
parts.append(meta.impact.test_type)
if meta.impact.speed_kmh > 0:
parts.append(f"{meta.impact.speed_kmh:.1f} km/h")
test_info = " | ".join(parts)
if len(app_state.tests) > 1:
test_info += f" (+{len(app_state.tests) - 1} overlay)"
return dbc.Navbar(
dbc.Container(
[
# Brand
dbc.NavbarBrand(
"Impakt", className="ms-2", style={"fontWeight": "bold", "fontSize": "18px"}
),
# Test info
html.Span(
test_info,
className="text-light ms-3",
style={"fontSize": "12px", "opacity": "0.9"},
),
# Right-side controls
dbc.Nav(
[
# Open test button
dbc.NavItem(
dbc.Button(
"Open Test",
id="open-test-btn",
color="light",
size="sm",
outline=True,
className="me-2",
),
),
# Add overlay button
dbc.NavItem(
dbc.Button(
"Add Overlay",
id="add-overlay-btn",
color="light",
size="sm",
outline=True,
className="me-2",
disabled=app_state.is_empty,
),
),
# Template selector
dbc.NavItem(
dbc.Select(
id="template-select",
options=[{"label": t, "value": t} for t in (template_names or [])],
placeholder="Template...",
style={"width": "180px", "fontSize": "12px"},
),
),
],
className="ms-auto",
navbar=True,
),
],
fluid=True,
),
color="dark",
dark=True,
)
def build_test_info_panel(app_state: AppState) -> dbc.Card:
"""Build a collapsible test info summary card."""
if app_state.is_empty:
return dbc.Card(
dbc.CardBody(
html.Div(
"No test loaded. Click 'Open Test' to begin.",
className="text-muted text-center py-3",
),
),
)
rows = []
for loaded in app_state.tests:
meta = loaded.data.metadata
badge_color = "primary" if loaded == app_state.primary_test else "secondary"
rows.append(
html.Tr(
[
html.Td(
dbc.Badge(meta.test_number, color=badge_color, className="me-1"),
style={"fontSize": "12px"},
),
html.Td(
f"{meta.vehicle.make} {meta.vehicle.model}".strip() or "",
style={"fontSize": "12px"},
),
html.Td(meta.dummy.dummy_type or "", style={"fontSize": "12px"}),
html.Td(meta.impact.test_type or "", style={"fontSize": "12px"}),
html.Td(
f"{meta.impact.speed_kmh:.1f}" if meta.impact.speed_kmh else "",
style={"fontSize": "12px"},
),
html.Td(str(loaded.channel_count), style={"fontSize": "12px"}),
html.Td(
dbc.Button(
"x",
id={"type": "remove-test-btn", "index": loaded.test_id},
color="danger",
size="sm",
outline=True,
style={"padding": "0 6px", "fontSize": "10px", "lineHeight": "1.4"},
)
if loaded != app_state.primary_test
else "",
style={"width": "30px"},
),
]
)
)
return dbc.Card(
dbc.CardBody(
[
html.Table(
[
html.Thead(
html.Tr(
[
html.Th("Test", style={"fontSize": "11px"}),
html.Th("Vehicle", style={"fontSize": "11px"}),
html.Th("Dummy", style={"fontSize": "11px"}),
html.Th("Type", style={"fontSize": "11px"}),
html.Th("km/h", style={"fontSize": "11px"}),
html.Th("Ch.", style={"fontSize": "11px"}),
html.Th("", style={"fontSize": "11px"}),
]
)
),
html.Tbody(rows),
],
className="table table-sm table-borderless mb-0",
),
],
className="py-1",
),
className="mb-2",
style={"backgroundColor": "#f8f9fa"},
)
def build_open_test_modal() -> dbc.Modal:
"""Build the modal dialog for opening a test."""
return dbc.Modal(
[
dbc.ModalHeader(dbc.ModalTitle("Open Test Data")),
dbc.ModalBody(
[
dbc.Label("Path to test directory:", size="sm"),
dbc.Input(
id="open-test-path",
type="text",
placeholder="/path/to/test_data",
size="sm",
className="mb-2",
),
html.Div(id="open-test-error", className="text-danger small"),
]
),
dbc.ModalFooter(
[
dbc.Button("Cancel", id="open-test-cancel", color="secondary", size="sm"),
dbc.Button("Open", id="open-test-confirm", color="primary", size="sm"),
]
),
],
id="open-test-modal",
is_open=False,
centered=True,
)
def build_overlay_modal() -> dbc.Modal:
"""Build the modal dialog for adding an overlay test."""
return dbc.Modal(
[
dbc.ModalHeader(dbc.ModalTitle("Add Overlay Test")),
dbc.ModalBody(
[
dbc.Label("Path to overlay test directory:", size="sm"),
dbc.Input(
id="overlay-test-path",
type="text",
placeholder="/path/to/overlay_test",
size="sm",
className="mb-2",
),
html.Div(id="overlay-test-error", className="text-danger small"),
]
),
dbc.ModalFooter(
[
dbc.Button("Cancel", id="overlay-test-cancel", color="secondary", size="sm"),
dbc.Button("Add", id="overlay-test-confirm", color="primary", size="sm"),
]
),
],
id="overlay-test-modal",
is_open=False,
centered=True,
)

View File

@@ -0,0 +1,133 @@
"""Multi-pane plot grid component.
Supports configurable layouts (1x1, 2x1, 1x2, 2x2) with independent
plot panes. Each pane has its own channel selection and axis labels.
"""
from __future__ import annotations
import dash_bootstrap_components as dbc
from dash import dcc, html
# Layout presets: (rows, cols)
LAYOUT_PRESETS: dict[str, tuple[int, int]] = {
"1x1": (1, 1),
"2x1": (2, 1),
"1x2": (1, 2),
"2x2": (2, 2),
"3x1": (3, 1),
}
MAX_PANES = 6
def build_plot_grid(layout: str = "1x1") -> html.Div:
"""Build the plot grid with layout selector and plot panes."""
return html.Div(
[
# Layout selector row
html.Div(
[
dbc.ButtonGroup(
[
dbc.Button(
label,
id={"type": "layout-btn", "index": label},
color="outline-secondary",
size="sm",
active=(label == layout),
style={"fontSize": "11px", "padding": "2px 8px"},
)
for label in LAYOUT_PRESETS
],
className="me-3",
),
html.Span(
id="plot-grid-info", className="text-muted", style={"fontSize": "11px"}
),
],
className="d-flex align-items-center mb-2",
),
# Plot panes container
html.Div(id="plot-grid-container", children=_build_panes(layout)),
# Hidden store for layout state
dcc.Store(id="plot-layout-store", data={"layout": layout, "pane_count": 1}),
]
)
def _build_panes(layout: str) -> list:
"""Build plot pane elements for a given layout."""
rows, cols = LAYOUT_PRESETS.get(layout, (1, 1))
pane_count = rows * cols
panes = []
for pane_idx in range(pane_count):
pane = _build_single_pane(pane_idx, pane_count)
panes.append(pane)
# Arrange in rows
if cols == 1:
return panes
else:
result = []
for row in range(rows):
row_panes = []
for col in range(cols):
idx = row * cols + col
if idx < len(panes):
row_panes.append(dbc.Col(panes[idx], width=12 // cols))
result.append(dbc.Row(row_panes, className="mb-2"))
return result
def _build_single_pane(pane_idx: int, total_panes: int) -> dbc.Card:
"""Build a single plot pane with its graph and controls."""
height = "500px" if total_panes == 1 else "320px" if total_panes <= 2 else "250px"
return dbc.Card(
[
dbc.CardBody(
[
dcc.Graph(
id={"type": "plot-graph", "index": pane_idx},
config={
"displayModeBar": True,
"scrollZoom": True,
"displaylogo": False,
"modeBarButtonsToRemove": ["select2d", "lasso2d"],
"modeBarButtonsToAdd": ["drawline", "eraseshape"],
},
style={"height": height},
),
],
className="p-2",
),
],
className="mb-2",
)
def build_empty_plot_figure() -> dict:
"""Build an empty plot figure with instruction text."""
return {
"data": [],
"layout": {
"template": "plotly_white",
"annotations": [
{
"text": "Select channels from the tree to plot",
"xref": "paper",
"yref": "paper",
"x": 0.5,
"y": 0.5,
"showarrow": False,
"font": {"size": 14, "color": "#bbb"},
}
],
"xaxis": {"visible": False},
"yaxis": {"visible": False},
"margin": {"l": 40, "r": 20, "t": 30, "b": 40},
},
}

View File

@@ -0,0 +1,73 @@
"""Report generation panel component."""
from __future__ import annotations
import dash_bootstrap_components as dbc
from dash import dcc, html
def build_report_panel() -> dbc.Card:
"""Build the report generation card."""
return dbc.Card(
[
dbc.CardHeader("Export", className="fw-bold py-2"),
dbc.CardBody(
[
# Plot export
dbc.Label("Plot Export", size="sm", className="mb-1"),
dbc.ButtonGroup(
[
dbc.Button(
"PNG",
id="export-png-btn",
color="outline-secondary",
size="sm",
style={"fontSize": "11px"},
),
dbc.Button(
"SVG",
id="export-svg-btn",
color="outline-secondary",
size="sm",
style={"fontSize": "11px"},
),
dbc.Button(
"PDF",
id="export-pdf-btn",
color="outline-secondary",
size="sm",
style={"fontSize": "11px"},
),
],
className="mb-2 w-100",
),
html.Hr(style={"margin": "6px 0"}),
# Data export
dbc.Label("Data Export", size="sm", className="mb-1"),
dbc.Button(
"Export CSV",
id="export-csv-btn",
color="outline-secondary",
size="sm",
className="w-100 mb-2",
style={"fontSize": "11px"},
),
html.Hr(style={"margin": "6px 0"}),
# Report generation
dbc.Label("Protocol Report", size="sm", className="mb-1"),
dbc.Button(
"Generate Report",
id="generate-report-btn",
color="success",
size="sm",
className="w-100",
style={"fontSize": "11px"},
),
html.Div(id="report-status", className="mt-2"),
# Download component (hidden, triggered by callbacks)
dcc.Download(id="download-export"),
],
className="py-2",
),
]
)

View File

@@ -0,0 +1,72 @@
"""Transform controls panel component."""
from __future__ import annotations
import dash_bootstrap_components as dbc
from dash import html
def build_transform_panel() -> dbc.Card:
"""Build the transform controls card."""
return dbc.Card(
[
dbc.CardHeader("Transforms", className="fw-bold py-2"),
dbc.CardBody(
[
# CFC Filter
dbc.Label("CFC Filter", size="sm", className="mb-1"),
dbc.Select(
id="cfc-select",
options=[
{"label": "None (raw)", "value": "none"},
{"label": "CFC 60 (100 Hz)", "value": "60"},
{"label": "CFC 180 (300 Hz)", "value": "180"},
{"label": "CFC 600 (1000 Hz)", "value": "600"},
{"label": "CFC 1000 (1650 Hz)", "value": "1000"},
],
value="none",
size="sm",
className="mb-2",
),
# Resultant toggle
dbc.Checkbox(
id="show-resultant",
label="Compute resultant",
value=False,
className="mb-1",
),
# Y-align toggle
dbc.Checkbox(
id="y-align-check",
label="Y-axis zero correction",
value=False,
className="mb-2",
),
html.Hr(style={"margin": "8px 0"}),
# X-align controls
dbc.Label("X-Axis Alignment", size="sm", className="mb-1"),
dbc.Select(
id="x-align-method",
options=[
{"label": "None", "value": "none"},
{"label": "Manual offset", "value": "manual"},
{"label": "Threshold crossing", "value": "threshold"},
],
value="none",
size="sm",
className="mb-1",
),
dbc.Input(
id="x-align-value",
type="number",
placeholder="Time offset (s) or threshold",
size="sm",
step=0.001,
className="mb-1",
style={"display": "none"},
),
],
className="py-2",
),
]
)

View File

@@ -1,7 +1,8 @@
"""Dash layout components.
"""Top-level Dash layout builder.
Builds the UI structure: header, channel tree, plot area, cursor table,
transform controls, and report panel.
Assembles all component modules into the complete page layout.
Uses a flex layout with a draggable splitter between the left
sidebar and the main content area.
"""
from __future__ import annotations
@@ -9,337 +10,120 @@ from __future__ import annotations
from typing import Any
import dash_bootstrap_components as dbc
from dash import dcc, html
from dash import ClientsideFunction, clientside_callback, dcc, html
from impakt.channel.model import TestData
from impakt.web.components.channel_grid import build_channel_grid
from impakt.web.components.criteria import build_criteria_panel
from impakt.web.components.cursors import build_cursor_panel
from impakt.web.components.header import (
build_header,
build_open_test_modal,
build_overlay_modal,
build_test_info_panel,
)
from impakt.web.components.plot_grid import build_plot_grid
from impakt.web.components.report import build_report_panel
from impakt.web.components.transforms import build_transform_panel
from impakt.web.state import AppState
def build_channel_tree_items(test_data: TestData) -> list[dict[str, Any]]:
"""Build checklist items from the channel tree hierarchy."""
tree = test_data.channel_tree()
items = []
for obj, locations in sorted(tree.items()):
for loc, measurements in sorted(locations.items()):
for meas, channels in sorted(measurements.items()):
for ch in channels:
label = f"{ch.code.short_label}" if ch.code.is_valid else ch.name
items.append(
{
"label": label,
"value": ch.name,
"group": f"{obj} / {loc}",
}
)
return items
def build_layout(test_data: TestData, template_names: list[str] | None = None) -> html.Div:
"""Build the complete Dash layout."""
tree_items = build_channel_tree_items(test_data)
meta = test_data.metadata
# Header info
test_info_parts = [f"Test: {meta.test_number}"]
if meta.vehicle.make:
test_info_parts.append(f"{meta.vehicle.year} {meta.vehicle.make} {meta.vehicle.model}")
if meta.dummy.dummy_type:
test_info_parts.append(f"Dummy: {meta.dummy.dummy_type}")
if meta.impact.test_type:
test_info_parts.append(f"{meta.impact.test_type}")
test_info = " | ".join(test_info_parts)
channel_options = [{"label": item["label"], "value": item["value"]} for item in tree_items]
def build_layout(app_state: AppState, template_names: list[str] | None = None) -> html.Div:
"""Build the complete page layout from components."""
return html.Div(
[
# Header
dbc.Navbar(
dbc.Container(
# --- Header ---
build_header(app_state, template_names),
# --- Modals ---
build_open_test_modal(),
build_overlay_modal(),
# --- Test info bar ---
html.Div(
build_test_info_panel(app_state),
className="px-3",
),
# --- Main content: flex row with splitter ---
html.Div(
[
dbc.NavbarBrand("Impakt", className="ms-2", style={"fontWeight": "bold"}),
html.Span(
test_info, className="text-light ms-3", style={"fontSize": "13px"}
),
dbc.Nav(
# === Left panel: Channels + Transforms ===
html.Div(
[
dbc.NavItem(
dbc.Select(
id="template-select",
options=[
{"label": t, "value": t} for t in (template_names or [])
build_channel_grid(app_state),
html.Div(style={"height": "8px"}),
build_transform_panel(),
],
placeholder="Select template...",
style={"width": "200px", "fontSize": "13px"},
)
id="left-panel",
style={
"width": "320px",
"minWidth": "200px",
"maxWidth": "600px",
"overflowY": "auto",
"flexShrink": "0",
"padding": "0 8px",
},
),
],
className="ms-auto",
# === Splitter handle ===
html.Div(
id="splitter-handle",
style={
"width": "6px",
"cursor": "col-resize",
"backgroundColor": "#e9ecef",
"flexShrink": "0",
"transition": "background-color 0.15s",
"position": "relative",
"zIndex": "10",
},
# The hover/active styling is in CSS
),
],
fluid=True,
),
color="dark",
dark=True,
className="mb-3",
),
# Main content
dbc.Container(
# === Center + Right: fills remaining space ===
html.Div(
[
dbc.Row(
[
# Left panel — Channel selection & transforms
# Center: Plot grid + Cursors
dbc.Col(
[
dbc.Card(
[
dbc.CardHeader("Channels", className="fw-bold"),
dbc.CardBody(
[
dbc.Input(
id="channel-search",
placeholder="Search channels...",
type="text",
size="sm",
className="mb-2",
),
html.Div(
dcc.Checklist(
id="channel-select",
options=channel_options,
value=[],
labelStyle={
"display": "block",
"fontSize": "12px",
"padding": "2px 0",
},
inputStyle={"marginRight": "6px"},
build_plot_grid("1x1"),
build_cursor_panel(),
],
width=8,
),
# Right: Criteria + Report
dbc.Col(
[
build_criteria_panel(),
build_report_panel(),
],
width=4,
style={
"maxHeight": "350px",
"maxHeight": "calc(100vh - 160px)",
"overflowY": "auto",
},
),
]
),
],
className="mb-3",
),
dbc.Card(
[
dbc.CardHeader("Transforms", className="fw-bold"),
dbc.CardBody(
[
dbc.Label("CFC Filter", size="sm"),
dbc.Select(
id="cfc-select",
options=[
{"label": "None", "value": "none"},
{
"label": "CFC 60 (100 Hz)",
"value": "60",
style={
"flex": "1",
"minWidth": "0",
"overflowX": "hidden",
"padding": "0 8px",
},
{
"label": "CFC 180 (300 Hz)",
"value": "180",
),
],
id="main-content",
style={
"display": "flex",
"flexDirection": "row",
"height": "calc(100vh - 130px)",
"overflow": "hidden",
},
{
"label": "CFC 600 (1000 Hz)",
"value": "600",
},
{
"label": "CFC 1000 (1650 Hz)",
"value": "1000",
},
],
value="none",
size="sm",
className="mb-2",
),
dbc.Checkbox(
id="show-resultant",
label="Show resultant",
value=False,
className="mb-2",
),
dbc.Checkbox(
id="y-align-check",
label="Y-axis zero correction",
value=False,
),
]
),
]
),
],
width=3,
),
# Center — Plot area + cursor table
dbc.Col(
[
dbc.Card(
[
dbc.CardBody(
[
dcc.Graph(
id="main-plot",
config={
"displayModeBar": True,
"scrollZoom": True,
"modeBarButtonsToAdd": [
"drawline",
"eraseshape",
],
},
style={"height": "500px"},
),
]
),
],
className="mb-3",
),
# Cursor controls & values table
dbc.Card(
[
dbc.CardHeader(
[
html.Span(
"X-Axis Cursors", className="fw-bold"
),
]
),
dbc.CardBody(
[
dbc.Row(
[
dbc.Col(
[
dbc.InputGroup(
[
dbc.InputGroupText(
"x1"
),
dbc.Input(
id="cursor-x1",
type="number",
step=0.001,
value=0.0,
size="sm",
),
dbc.InputGroupText("s"),
],
size="sm",
),
],
width=4,
),
dbc.Col(
[
dbc.InputGroup(
[
dbc.InputGroupText(
"x2"
),
dbc.Input(
id="cursor-x2",
type="number",
step=0.001,
value=0.1,
size="sm",
),
dbc.InputGroupText("s"),
],
size="sm",
),
],
width=4,
),
dbc.Col(
[
dbc.Button(
"Update",
id="cursor-update-btn",
color="primary",
size="sm",
),
],
width=2,
),
],
className="mb-2",
),
html.Div(id="cursor-values-table"),
]
),
]
),
],
width=6,
),
# Right panel — Criteria & report
dbc.Col(
[
dbc.Card(
[
dbc.CardHeader("Injury Criteria", className="fw-bold"),
dbc.CardBody(
[
dbc.Button(
"Compute All",
id="compute-criteria-btn",
color="primary",
size="sm",
className="mb-2 w-100",
),
html.Div(id="criteria-results"),
]
),
],
className="mb-3",
),
dbc.Card(
[
dbc.CardHeader("Report", className="fw-bold"),
dbc.CardBody(
[
dbc.Label("Protocol", size="sm"),
dbc.Select(
id="protocol-select",
options=[
{
"label": "Euro NCAP",
"value": "euro_ncap",
},
{
"label": "US NCAP",
"value": "us_ncap",
},
{"label": "IIHS", "value": "iihs"},
],
value="euro_ncap",
size="sm",
className="mb-2",
),
dbc.Button(
"Generate PDF",
id="generate-report-btn",
color="success",
size="sm",
className="w-100",
),
html.Div(id="report-status", className="mt-2"),
]
),
]
),
],
width=3,
),
]
),
],
fluid=True,
),
# Hidden stores
# --- Hidden stores ---
dcc.Store(id="selected-channels-store", data=[]),
dcc.Store(id="session-store", data={}),
dcc.Store(id="page-refresh-trigger", data={}),
dcc.Store(id="splitter-state", data={"width": 320}),
]
)

234
src/impakt/web/state.py Normal file
View File

@@ -0,0 +1,234 @@
"""Application state management.
AppState is the central data store for the web UI. It holds all loaded tests,
manages channel transforms, and provides the data that callbacks need to
render plots, compute criteria, and generate reports.
AppState is NOT a Dash Store — it lives server-side in Python memory. The Dash
stores hold lightweight references (test IDs, selected channels, plot config)
that callbacks use to look up data from AppState.
This design keeps large numpy arrays out of the browser and avoids serialization.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from impakt.channel.model import Channel, ChannelGroup, TestData
from impakt.io.reader import get_registry
from impakt.transform.align import YAlign
from impakt.transform.cfc import CFCFilter
logger = logging.getLogger(__name__)
class LoadedTest:
"""A single loaded test with its metadata and channel access."""
def __init__(self, test_data: TestData, color_offset: int = 0) -> None:
self.data = test_data
self.color_offset = color_offset
@property
def test_id(self) -> str:
return self.data.test_id
@property
def label(self) -> str:
"""Short label for UI display."""
meta = self.data.metadata
parts = [meta.test_number]
if meta.vehicle.make:
parts.append(f"{meta.vehicle.make} {meta.vehicle.model}".strip())
return "".join(parts) if len(parts) > 1 else parts[0]
@property
def channel_count(self) -> int:
return len(self.data)
def __len__(self) -> int:
return len(self.data)
class AppState:
"""Server-side application state.
Manages multiple loaded tests and provides channel resolution
for the UI callbacks.
"""
def __init__(self) -> None:
self._tests: dict[str, LoadedTest] = {}
self._test_order: list[str] = []
self._color_counter: int = 0
def load_test(self, path: str | Path) -> LoadedTest:
"""Load a test from a path and add it to the state.
If a test with the same ID is already loaded, replaces it.
Returns:
The loaded test.
Raises:
ValueError: If the path is not a valid test directory.
"""
path = Path(path).resolve()
registry = get_registry()
test_data = registry.read(path)
loaded = LoadedTest(test_data, color_offset=self._color_counter)
self._color_counter += len(test_data)
test_id = test_data.test_id
if test_id in self._tests:
self._tests[test_id] = loaded
else:
self._tests[test_id] = loaded
self._test_order.append(test_id)
logger.info("Loaded test '%s' from %s (%d channels)", test_id, path, len(test_data))
return loaded
def remove_test(self, test_id: str) -> None:
"""Remove a loaded test."""
if test_id in self._tests:
del self._tests[test_id]
self._test_order.remove(test_id)
@property
def tests(self) -> list[LoadedTest]:
"""All loaded tests in load order."""
return [self._tests[tid] for tid in self._test_order if tid in self._tests]
@property
def test_ids(self) -> list[str]:
return list(self._test_order)
@property
def primary_test(self) -> LoadedTest | None:
"""The first loaded test (primary for criteria, etc.)."""
if self._test_order:
return self._tests.get(self._test_order[0])
return None
def get_test(self, test_id: str) -> LoadedTest | None:
return self._tests.get(test_id)
def get_channel(self, test_id: str, channel_name: str) -> Channel | None:
"""Resolve a channel from a test ID and channel name."""
test = self._tests.get(test_id)
if test is None:
return None
try:
return test.data.get(channel_name)
except KeyError:
return None
def resolve_channel(
self,
channel_key: str,
cfc_class: int | None = None,
y_align: bool = False,
) -> Channel | None:
"""Resolve a channel key (test_id::channel_name) and apply transforms.
Channel keys use the format 'test_id::channel_name'. If no '::'
separator, assumes the primary test.
"""
if "::" in channel_key:
test_id, ch_name = channel_key.split("::", 1)
else:
if self.primary_test is None:
return None
test_id = self.primary_test.test_id
ch_name = channel_key
ch = self.get_channel(test_id, ch_name)
if ch is None:
return None
# Apply transforms
if cfc_class is not None:
try:
ch = CFCFilter(cfc_class=cfc_class).apply(ch)
except (ValueError, Exception):
pass
if y_align:
ch = YAlign().apply(ch)
return ch
def build_channel_tree(
self,
) -> dict[str, dict[str, dict[str, dict[str, list[dict[str, str]]]]]]:
"""Build a hierarchical channel tree across all loaded tests.
Returns:
{test_id: {object_label: {location_label: {measurement_label: [{name, label, key}]}}}}
"""
result: dict[str, dict[str, dict[str, dict[str, list[dict[str, str]]]]]] = {}
for loaded in self.tests:
tree = loaded.data.channel_tree()
test_tree: dict[str, dict[str, dict[str, list[dict[str, str]]]]] = {}
for obj, locations in sorted(tree.items()):
obj_tree: dict[str, dict[str, list[dict[str, str]]]] = {}
for loc, measurements in sorted(locations.items()):
loc_tree: dict[str, list[dict[str, str]]] = {}
for meas, channels in sorted(measurements.items()):
ch_list = []
for ch in channels:
label = ch.code.short_label if ch.code.is_valid else ch.name
ch_list.append(
{
"name": ch.name,
"label": label,
"key": f"{loaded.test_id}::{ch.name}",
"unit": ch.unit,
"peak": f"{ch.peak:.2f}",
"samples": str(ch.n_samples),
"rate": f"{ch.sample_rate:.0f}",
}
)
loc_tree[meas] = ch_list
obj_tree[loc] = loc_tree
test_tree[obj] = obj_tree
result[loaded.test_id] = test_tree
return result
def flat_channel_list(self) -> list[dict[str, str]]:
"""Flat list of all channels across all tests, for dropdown/checklist options."""
items: list[dict[str, str]] = []
multi = len(self._tests) > 1
for loaded in self.tests:
prefix = f"[{loaded.test_id}] " if multi else ""
for ch in sorted(loaded.data, key=lambda c: c.name):
label = ch.code.short_label if ch.code.is_valid else ch.name
items.append(
{
"label": f"{prefix}{label}",
"value": f"{loaded.test_id}::{ch.name}",
}
)
return items
@property
def is_empty(self) -> bool:
return len(self._tests) == 0
@property
def total_channels(self) -> int:
return sum(t.channel_count for t in self.tests)
def __repr__(self) -> str:
test_info = ", ".join(f"{t.test_id}({t.channel_count}ch)" for t in self.tests)
return f"AppState([{test_info}])"

View File

View File

@@ -0,0 +1,83 @@
"""Tests for web app creation."""
from pathlib import Path
import pytest
from impakt.io.mme import MMEReader
from impakt.web.app import create_app
from impakt.web.state import AppState
FIXTURE_DATA = Path(__file__).parent.parent / "fixtures" / "sample_mme"
MME_DATA = Path(__file__).parent.parent / "mme_data"
class TestAppCreation:
def test_create_empty_app(self):
app = create_app()
assert app is not None
assert app.title == "Impakt"
def test_create_app_with_test_data(self):
reader = MMEReader()
data = reader.read(FIXTURE_DATA)
app = create_app(data)
assert app is not None
assert "IMPAKT_SYNTH_001" in app.title
def test_create_app_with_app_state(self):
state = AppState()
state.load_test(FIXTURE_DATA)
app = create_app(app_state=state)
assert app is not None
assert "IMPAKT_SYNTH_001" in app.title
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real data not available")
def test_create_app_with_real_data(self):
state = AppState()
state.load_test(MME_DATA / "3239")
app = create_app(app_state=state)
assert app is not None
assert "3239" in app.title
class TestCriteriaAutoCompute:
def test_auto_compute_on_synthetic_data(self):
from impakt.web.components.criteria import auto_compute_criteria
reader = MMEReader()
data = reader.read(FIXTURE_DATA)
criteria = auto_compute_criteria(data)
# Synthetic data should have head accel -> HIC15
assert len(criteria) > 0
# Should at least get HIC15 and chest deflection
criterion_names = set(criteria.keys())
assert "HIC15" in criterion_names or "Chest Deflection" in criterion_names
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real data not available")
def test_auto_compute_on_real_data(self):
from impakt.web.components.criteria import auto_compute_criteria
reader = MMEReader()
data = reader.read(MME_DATA / "3239")
criteria = auto_compute_criteria(data)
# Real NHTSA data should yield multiple criteria
assert len(criteria) >= 3
criterion_names = set(criteria.keys())
# 3239 has head, chest, neck, femur channels
assert "HIC15" in criterion_names
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real data not available")
def test_protocol_scoring_on_real_data(self):
from impakt.web.components.criteria import auto_compute_criteria, score_protocol
reader = MMEReader()
data = reader.read(MME_DATA / "3239")
criteria = auto_compute_criteria(data)
for protocol in ["euro_ncap", "us_ncap", "iihs"]:
result = score_protocol(criteria, protocol)
assert result is not None
assert len(result.region_scores) > 0

View File

@@ -0,0 +1,140 @@
"""Tests for channel grid component."""
from pathlib import Path
import pytest
from impakt.io.mme import MMEReader
from impakt.web.components.channel_grid import (
_build_channel_rows,
build_selected_channels_badges,
filter_rows,
)
from impakt.web.state import AppState
FIXTURE_DATA = Path(__file__).parent.parent / "fixtures" / "sample_mme"
MME_DATA = Path(__file__).parent.parent / "mme_data"
class TestChannelRows:
def test_build_rows_from_synthetic(self):
state = AppState()
state.load_test(FIXTURE_DATA)
rows = _build_channel_rows(state)
assert len(rows) == 26
# Each row should have required fields
for r in rows:
assert "key" in r
assert "iso_code" in r
assert "unit" in r
assert "min" in r
assert "max" in r
assert "::" in r["key"]
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real data not available")
def test_build_rows_from_real_data(self):
state = AppState()
state.load_test(MME_DATA / "3239")
rows = _build_channel_rows(state)
assert len(rows) == 133
def test_multi_test_rows_have_test_id(self):
state = AppState()
state.load_test(FIXTURE_DATA)
if (MME_DATA / "VW1FGS15").exists():
state.load_test(MME_DATA / "VW1FGS15")
rows = _build_channel_rows(state)
assert len(rows) == 36 # 26 + 10
# Multi-test rows should have test_id filled
test_ids = {r["test_id"] for r in rows}
assert len(test_ids) == 2
def test_rows_preserve_load_order(self):
state = AppState()
state.load_test(FIXTURE_DATA)
rows = _build_channel_rows(state)
# First channel in fixture should be first row
# (this depends on how MMEReader yields channels — sorted by name)
iso_codes = [r["iso_code"] for r in rows]
assert len(iso_codes) == 26
class TestFilterRows:
@pytest.fixture
def rows(self):
state = AppState()
state.load_test(FIXTURE_DATA)
return _build_channel_rows(state)
def test_no_filter_returns_all(self, rows):
result = filter_rows(rows)
assert len(result) == len(rows)
def test_wildcard_filter_head(self, rows):
result = filter_rows(rows, pattern="*HEAD*")
assert len(result) > 0
assert all("HEAD" in r["iso_code"] for r in result)
def test_wildcard_filter_accel(self, rows):
result = filter_rows(rows, pattern="*AC*")
assert len(result) > 0
assert all("AC" in r["iso_code"] for r in result)
def test_partial_text_auto_wrapped(self, rows):
"""Typing 'HEAD' without wildcards should auto-wrap to *HEAD*."""
result = filter_rows(rows, pattern="HEAD")
assert len(result) > 0
assert all("HEAD" in r["iso_code"] for r in result)
def test_filter_by_body_facet(self, rows):
result = filter_rows(rows, body="Head")
assert len(result) > 0
assert all(r["body"] == "Head" for r in result)
def test_filter_by_direction_facet(self, rows):
result = filter_rows(rows, direction="X")
assert len(result) > 0
assert all(r["direction"] == "X" for r in result)
def test_combined_filter(self, rows):
result = filter_rows(rows, pattern="*AC*", direction="X")
assert len(result) > 0
for r in result:
assert "AC" in r["iso_code"]
assert r["direction"] == "X"
def test_no_match_returns_empty(self, rows):
result = filter_rows(rows, pattern="NONEXISTENT")
assert len(result) == 0
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real data not available")
def test_wildcard_filter_real_data(self):
state = AppState()
state.load_test(MME_DATA / "3239")
rows = _build_channel_rows(state)
# Filter for head channels
head = filter_rows(rows, pattern="*HEAD*")
assert len(head) > 0
# Filter for barrier load cells
barrier = filter_rows(rows, pattern="B0FBAR*")
assert len(barrier) > 0
# Filter for femur
femur = filter_rows(rows, pattern="*FEMR*")
assert len(femur) >= 2 # Left and right
class TestSelectedBadges:
def test_empty_selection(self):
state = AppState()
badges = build_selected_channels_badges([], state)
assert len(badges) == 1 # "No channels selected"
def test_with_selection(self):
state = AppState()
state.load_test(FIXTURE_DATA)
keys = ["IMPAKT_SYNTH_001::11HEAD0000ACXA", "IMPAKT_SYNTH_001::11HEAD0000ACYA"]
badges = build_selected_channels_badges(keys, state)
assert len(badges) == 2

View File

@@ -0,0 +1,108 @@
"""Tests for web AppState."""
from pathlib import Path
import pytest
from impakt.web.state import AppState
MME_DATA = Path(__file__).parent.parent / "mme_data"
FIXTURE_DATA = Path(__file__).parent.parent / "fixtures" / "sample_mme"
class TestAppState:
def test_initially_empty(self):
state = AppState()
assert state.is_empty
assert state.total_channels == 0
assert state.primary_test is None
def test_load_test(self):
state = AppState()
loaded = state.load_test(FIXTURE_DATA)
assert not state.is_empty
assert loaded.test_id == "IMPAKT_SYNTH_001"
assert loaded.channel_count == 26
assert state.primary_test is loaded
def test_load_multiple_tests(self):
state = AppState()
t1 = state.load_test(FIXTURE_DATA)
if (MME_DATA / "VW1FGS15").exists():
t2 = state.load_test(MME_DATA / "VW1FGS15")
assert len(state.tests) == 2
assert state.primary_test is t1 # First loaded is primary
assert state.total_channels == 26 + 10
def test_remove_test(self):
state = AppState()
loaded = state.load_test(FIXTURE_DATA)
state.remove_test(loaded.test_id)
assert state.is_empty
def test_get_channel(self):
state = AppState()
state.load_test(FIXTURE_DATA)
ch = state.get_channel("IMPAKT_SYNTH_001", "11HEAD0000ACXA")
assert ch is not None
assert ch.name == "11HEAD0000ACXA"
def test_get_channel_missing(self):
state = AppState()
state.load_test(FIXTURE_DATA)
ch = state.get_channel("IMPAKT_SYNTH_001", "NONEXISTENT")
assert ch is None
def test_resolve_channel_with_key(self):
state = AppState()
state.load_test(FIXTURE_DATA)
ch = state.resolve_channel("IMPAKT_SYNTH_001::11HEAD0000ACXA")
assert ch is not None
def test_resolve_channel_primary_default(self):
state = AppState()
state.load_test(FIXTURE_DATA)
ch = state.resolve_channel("11HEAD0000ACXA")
assert ch is not None
def test_resolve_channel_with_cfc(self):
state = AppState()
state.load_test(FIXTURE_DATA)
ch = state.resolve_channel("11HEAD0000ACXA", cfc_class=600)
assert ch is not None
assert ch.cfc_class == 600
def test_flat_channel_list(self):
state = AppState()
state.load_test(FIXTURE_DATA)
items = state.flat_channel_list()
assert len(items) == 26
assert all("value" in item and "label" in item for item in items)
def test_build_channel_tree(self):
state = AppState()
state.load_test(FIXTURE_DATA)
tree = state.build_channel_tree()
assert "IMPAKT_SYNTH_001" in tree
# Should have hierarchical structure
test_tree = tree["IMPAKT_SYNTH_001"]
assert len(test_tree) > 0
@pytest.mark.skipif(not (MME_DATA / "3239").exists(), reason="Real MME data not available")
class TestAppStateRealData:
def test_load_real_mme(self):
state = AppState()
loaded = state.load_test(MME_DATA / "3239")
assert loaded.test_id == "3239"
assert loaded.channel_count == 133
def test_channel_tree_real_data(self):
state = AppState()
state.load_test(MME_DATA / "3239")
tree = state.build_channel_tree()
assert "3239" in tree
# Should contain "Driver" in some key
test_tree = tree["3239"]
assert any("Driver" in k for k in test_tree)