CIS490/training/dashboard/app.py
Max Gorog a8157ed177 training/dashboard: live deck at dashboard.wg, fed by receiver
Starlette + WebSocket dashboard run on the Pi as cis490-dashboard.service
(127.0.0.1:8447, Caddy-fronted at dashboard.wg). Tails
/var/lib/cis490/index.jsonl for episode events, snapshots host counts
every 30s, broadcasts to every connected browser. New connections get a
warm snapshot (recent_episodes, total_bytes, host_counts) so reloads
don't see a cold dashboard.

Frontend is a 10-scene scrollytelling deck following the project
outline: intro, collect, hosts, db explorer, baseline, attacks,
chunking, models, knn, perf. Sticky full-bleed canvas with a
right-aligned prose column (matrix-explorable layout). Hotkeys (arrows,
space, j/k, c, Home/End), prev/next chevrons, FAB, and an opt-in
click-to-advance toggle. Demo toggle drives synthetic data for the
five scenes that have no real producer yet (attack envelopes,
chunking, model bars, knn scatter, perf scatter); when off, those
scenes show "awaiting <event_type> events" rather than fake data.

Producers wire in by POSTing typed JSON to 127.0.0.1:8447/publish
(loopback only; Caddy 404s it externally). Event types the widgets
subscribe to: model_metric {model, accuracy}, embedding {x, y, phase},
model_perf {model, latency_us, accuracy}, prediction {episode_id,
window_idx, predicted, actual}, attack_profile {name, shape, curve},
phase {phase}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 21:26:07 -05:00

143 lines
4.8 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from starlette.routing import Mount, Route, WebSocketRoute
from starlette.staticfiles import StaticFiles
from starlette.websockets import WebSocket, WebSocketDisconnect
log = logging.getLogger("cis490.dashboard")
STATIC_DIR = Path(__file__).parent / "static"
class Broadcaster:
"""Tiny fan-out hub. Per-client async queues, oldest-message-drop
on backpressure. Also holds a ``state`` dict that is sent to every
new client on connect so reconnects don't see a cold dashboard."""
QUEUE_MAX = 64
def __init__(self) -> None:
self._clients: set[asyncio.Queue[Any]] = set()
self._lock = asyncio.Lock()
self.state: dict[str, Any] = {}
async def register(self) -> asyncio.Queue[Any]:
q: asyncio.Queue[Any] = asyncio.Queue(maxsize=self.QUEUE_MAX)
async with self._lock:
self._clients.add(q)
return q
async def unregister(self, q: asyncio.Queue[Any]) -> None:
async with self._lock:
self._clients.discard(q)
async def publish(self, msg: Any) -> int:
delivered = 0
async with self._lock:
clients = list(self._clients)
for q in clients:
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
try: q.get_nowait()
except asyncio.QueueEmpty: pass
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
pass
return delivered
@property
def client_count(self) -> int:
return len(self._clients)
broadcaster = Broadcaster()
def make_app(
*,
allow_publish_from: set[str] | None = None,
enable_feeders: bool = True,
data_root: Path = Path("/var/lib/cis490"),
) -> Starlette:
allowed = allow_publish_from if allow_publish_from is not None else {"127.0.0.1", "::1"}
@asynccontextmanager
async def lifespan(app):
tasks: list[asyncio.Task] = []
if enable_feeders:
from .feeder import start_feeders
try:
tasks = start_feeders(broadcaster, data_root=data_root)
log.info("started %d feeders rooted at %s", len(tasks), data_root)
except Exception:
log.exception("failed to start feeders; dashboard will run without them")
try:
yield
finally:
for t in tasks: t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
log.info("dashboard lifespan exiting; %d feeders cancelled", len(tasks))
async def index(request: Request) -> Response:
return FileResponse(STATIC_DIR / "index.html")
async def healthz(request: Request) -> JSONResponse:
return JSONResponse({
"status": "ok",
"clients": broadcaster.client_count,
"state_keys": sorted(broadcaster.state.keys()),
})
async def publish(request: Request) -> JSONResponse:
client_host = request.client.host if request.client else ""
if client_host not in allowed:
return JSONResponse({"error": "forbidden"}, status_code=403)
try:
body = await request.json()
except (ValueError, json.JSONDecodeError):
return JSONResponse({"error": "body must be JSON"}, status_code=400)
delivered = await broadcaster.publish(body)
return JSONResponse({"delivered": delivered})
async def ws_endpoint(ws: WebSocket) -> None:
await ws.accept()
q = await broadcaster.register()
try:
await ws.send_json({"type": "hello", "clients": broadcaster.client_count})
# Send the current snapshot so reconnects start warm.
if broadcaster.state:
await ws.send_json({"type": "snapshot", **broadcaster.state})
while True:
msg = await q.get()
await ws.send_json(msg)
except WebSocketDisconnect:
pass
except Exception:
log.exception("ws send failed")
finally:
await broadcaster.unregister(q)
routes = [
Route("/", index, methods=["GET"]),
Route("/healthz", healthz, methods=["GET"]),
Route("/publish", publish, methods=["POST"]),
WebSocketRoute("/ws", ws_endpoint),
Mount("/static", app=StaticFiles(directory=str(STATIC_DIR)), name="static"),
]
return Starlette(routes=routes, lifespan=lifespan)