CIS490/training/dashboard/app.py
Max Gorog 9e38f78379 training/dashboard(references): description sidebar + better space use
Two changes per the user's feedback that the slide had unused
horizontal space and needed per-PDF context.

Layout
- The reference scene is now a 2-column grid inside the
  metric-stack: PDF iframe at ~1.7fr on the left, description
  panel at ~0.55fr on the right (min 280px). On narrow viewports
  (<1100px) it falls back to a vertical stack with the
  description capped to 240px.
- Added #zoom=page-width to the iframe URL so the PDF's page
  fits its column width instead of leaving margins beside an
  8.5x11 page rendered in a wider iframe.
- Hide the prose card on the references scene — the description
  panel inside the stack covers what the prose was saying, and
  freeing the right edge gives the description proper room.

Description content
- Backend reads <stem>.md sidecar files alongside each PDF and
  returns the contents in the /api/references payload.
- Frontend renders them with a tiny built-in markdown subset
  (headings, bold/italic, lists, inline code, paragraphs) — no
  third-party renderer dependency.
- Initial draft sidecar .md files committed for the four PDFs
  currently in references/. Each describes how the paper informs
  a specific scene of the deck (which model row, which eval
  protocol, which channel selection). Edit them in place and the
  panel updates on the next reload.
2026-05-08 12:40:32 -05:00

298 lines
11 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import re
import subprocess
import tarfile
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from urllib.parse import quote
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from starlette.routing import Mount, Route, WebSocketRoute
from starlette.staticfiles import StaticFiles
from starlette.websockets import WebSocket, WebSocketDisconnect
log = logging.getLogger("cis490.dashboard")
STATIC_DIR = Path(__file__).parent / "static"
# References (PDFs the deck links to). Resolved relative to the
# install root so it works at /opt/cis490 (production) and the
# dev tree alike. The /refs URL serves files directly; /api/references
# returns the listing the frontend's tab UI iterates.
REFS_DIR = Path(__file__).parent.parent.parent / "references"
# Used to validate URL-supplied host_id / episode_id before they
# reach the filesystem. Allows the alphanumeric ULID episode IDs
# the orchestrator produces and reasonable host names. Anything
# with `..`, `/`, or other path-traversal characters is rejected.
SAFE_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,128}$")
def _load_episode_sync(
data_root: Path, host_id: str, episode_id: str
) -> dict[str, Any] | None:
"""Stream-decompress an episode tarball and parse the JSONL files
inside it. Returns ``None`` if the episode doesn't exist or the
IDs are unsafe. Synchronous; the route wraps this in
``asyncio.to_thread`` so the event loop isn't blocked by the
decompress + parse."""
if not (SAFE_ID_RE.match(host_id) and SAFE_ID_RE.match(episode_id)):
return None
path = data_root / "episodes" / host_id / f"{episode_id}.tar.zst"
if not path.is_file():
return None
samples: list[dict] = []
labels: list[dict] = []
meta: dict | None = None
proc = subprocess.Popen(
["zstd", "-dc", str(path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
with tarfile.open(fileobj=proc.stdout, mode="r|") as tar:
for member in tar:
if not member.isfile():
continue
name = member.name.rsplit("/", 1)[-1]
if name not in ("telemetry-proc.jsonl",
"labels.jsonl",
"meta.json"):
continue
f = tar.extractfile(member)
if f is None:
continue
data = f.read()
if name == "telemetry-proc.jsonl":
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
samples.append(json.loads(line))
except json.JSONDecodeError:
pass
elif name == "labels.jsonl":
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
labels.append(json.loads(line))
except json.JSONDecodeError:
pass
elif name == "meta.json":
try:
meta = json.loads(data)
except json.JSONDecodeError:
pass
finally:
if proc.stdout:
proc.stdout.close()
rc = proc.wait()
if rc != 0:
try:
err = proc.stderr.read().decode("utf-8", errors="replace")
log.warning("zstd %s exit %d: %s", path, rc, err[:200])
except Exception:
pass
if proc.stderr:
proc.stderr.close()
return {
"host_id": host_id,
"episode_id": episode_id,
"samples": samples,
"labels": labels,
"meta": meta,
}
class Broadcaster:
"""Tiny fan-out hub. Per-client async queues, oldest-message-drop
on backpressure. Also holds a ``state`` dict that is sent to every
new client on connect so reconnects don't see a cold dashboard."""
QUEUE_MAX = 64
def __init__(self) -> None:
self._clients: set[asyncio.Queue[Any]] = set()
self._lock = asyncio.Lock()
self.state: dict[str, Any] = {}
async def register(self) -> asyncio.Queue[Any]:
q: asyncio.Queue[Any] = asyncio.Queue(maxsize=self.QUEUE_MAX)
async with self._lock:
self._clients.add(q)
return q
async def unregister(self, q: asyncio.Queue[Any]) -> None:
async with self._lock:
self._clients.discard(q)
async def publish(self, msg: Any) -> int:
delivered = 0
async with self._lock:
clients = list(self._clients)
for q in clients:
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
try: q.get_nowait()
except asyncio.QueueEmpty: pass
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
pass
return delivered
@property
def client_count(self) -> int:
return len(self._clients)
broadcaster = Broadcaster()
def make_app(
*,
allow_publish_from: set[str] | None = None,
enable_feeders: bool = True,
data_root: Path = Path("/var/lib/cis490"),
) -> Starlette:
allowed = allow_publish_from if allow_publish_from is not None else {"127.0.0.1", "::1"}
@asynccontextmanager
async def lifespan(app):
tasks: list[asyncio.Task] = []
if enable_feeders:
from .feeder import start_feeders
try:
tasks = start_feeders(broadcaster, data_root=data_root)
log.info("started %d feeders rooted at %s", len(tasks), data_root)
except Exception:
log.exception("failed to start feeders; dashboard will run without them")
try:
yield
finally:
for t in tasks: t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
log.info("dashboard lifespan exiting; %d feeders cancelled", len(tasks))
async def index(request: Request) -> Response:
return FileResponse(STATIC_DIR / "index.html")
async def healthz(request: Request) -> JSONResponse:
return JSONResponse({
"status": "ok",
"clients": broadcaster.client_count,
"state_keys": sorted(broadcaster.state.keys()),
})
async def publish(request: Request) -> JSONResponse:
client_host = request.client.host if request.client else ""
if client_host not in allowed:
return JSONResponse({"error": "forbidden"}, status_code=403)
try:
body = await request.json()
except (ValueError, json.JSONDecodeError):
return JSONResponse({"error": "body must be JSON"}, status_code=400)
delivered = await broadcaster.publish(body)
return JSONResponse({"delivered": delivered})
async def ws_endpoint(ws: WebSocket) -> None:
await ws.accept()
q = await broadcaster.register()
try:
await ws.send_json({"type": "hello", "clients": broadcaster.client_count})
# Send the current snapshot so reconnects start warm.
if broadcaster.state:
await ws.send_json({"type": "snapshot", **broadcaster.state})
while True:
msg = await q.get()
await ws.send_json(msg)
except WebSocketDisconnect:
pass
except Exception:
log.exception("ws send failed")
finally:
await broadcaster.unregister(q)
async def references(request: Request) -> JSONResponse:
"""List available PDFs in REFS_DIR. Returns
``{"references": [{"name": str, "path": str}, ...]}`` sorted
alphabetically by name. Empty if the directory doesn't exist
or has no PDFs."""
if not REFS_DIR.is_dir():
return JSONResponse({"references": []})
items = []
try:
for p in REFS_DIR.iterdir():
if not p.is_file():
continue
if p.suffix.lower() != ".pdf":
continue
# Some downloaded PDFs ship with newlines in the
# filename (badly-wrapped DOI titles). Strip them
# for display and URL-encode for the path so the
# iframe can fetch /refs/<encoded-name>.
display_name = " ".join(p.stem.split())
# Sidecar markdown: <stem>.md alongside the PDF
# holds a free-form description of how the paper
# was used in the project. Optional — the
# frontend shows a placeholder if missing.
description = None
md_path = p.with_suffix(".md")
if md_path.is_file():
try:
description = md_path.read_text(encoding="utf-8")
except OSError:
log.warning("could not read sidecar %s", md_path)
items.append({
"name": display_name,
"path": "/refs/" + quote(p.name, safe=""),
"description": description,
})
except OSError:
log.exception("could not list references in %s", REFS_DIR)
items.sort(key=lambda r: r["name"].lower())
return JSONResponse({"references": items})
async def episode(request: Request) -> JSONResponse:
host_id = request.path_params["host_id"]
episode_id = request.path_params["episode_id"]
try:
result = await asyncio.to_thread(
_load_episode_sync, data_root, host_id, episode_id
)
except Exception:
log.exception("episode load failed for %s/%s", host_id, episode_id)
return JSONResponse({"error": "load failed"}, status_code=500)
if result is None:
return JSONResponse({"error": "episode not found"}, status_code=404)
return JSONResponse(result)
routes = [
Route("/", index, methods=["GET"]),
Route("/healthz", healthz, methods=["GET"]),
Route("/publish", publish, methods=["POST"]),
Route("/api/episode/{host_id}/{episode_id}", episode, methods=["GET"]),
Route("/api/references", references, methods=["GET"]),
WebSocketRoute("/ws", ws_endpoint),
Mount("/static", app=StaticFiles(directory=str(STATIC_DIR)), name="static"),
Mount("/refs", app=StaticFiles(directory=str(REFS_DIR), check_dir=False),
name="refs"),
]
return Starlette(routes=routes, lifespan=lifespan)