CIS490/training/dashboard/app.py
Max Gorog a04bba6281 training/dashboard: click a db row → render the episode envelope
New endpoint GET /api/episode/<host_id>/<episode_id> in app.py.
Stream-decompresses the tarball (zstd -dc piped into tarfile),
extracts telemetry-proc.jsonl, labels.jsonl, and meta.json,
returns the parsed contents. Synchronous extract runs in
asyncio.to_thread so the event loop isn't blocked.

Frontend: clicking a row in the database explorer now fetches
the episode and draws an SVG chart matching the README's Real
Alpine VM envelope shape:
  - per-interval CPU jiffies delta (user + sys)
  - per-interval IO bytes delta (read + write)
  - colored phase bands (clean/armed/infecting/infected_running/
    dormant) overlaid by labels.jsonl
  - axis ticks for 0-peak on Y, 0-totalDuration in seconds on X
  - legend below the chart with palette-driven swatches

The detail panel that previously showed the row JSON now shows
metadata + the chart + the legend. Validated end-to-end against
a real episode (863 samples, 8 labels) extracted from
/var/lib/cis490/episodes/elliott-thinkpad/.
2026-05-08 01:16:54 -05:00

249 lines
8.6 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import re
import subprocess
import tarfile
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from starlette.routing import Mount, Route, WebSocketRoute
from starlette.staticfiles import StaticFiles
from starlette.websockets import WebSocket, WebSocketDisconnect
log = logging.getLogger("cis490.dashboard")
STATIC_DIR = Path(__file__).parent / "static"
# Used to validate URL-supplied host_id / episode_id before they
# reach the filesystem. Allows the alphanumeric ULID episode IDs
# the orchestrator produces and reasonable host names. Anything
# with `..`, `/`, or other path-traversal characters is rejected.
SAFE_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,128}$")
def _load_episode_sync(
data_root: Path, host_id: str, episode_id: str
) -> dict[str, Any] | None:
"""Stream-decompress an episode tarball and parse the JSONL files
inside it. Returns ``None`` if the episode doesn't exist or the
IDs are unsafe. Synchronous; the route wraps this in
``asyncio.to_thread`` so the event loop isn't blocked by the
decompress + parse."""
if not (SAFE_ID_RE.match(host_id) and SAFE_ID_RE.match(episode_id)):
return None
path = data_root / "episodes" / host_id / f"{episode_id}.tar.zst"
if not path.is_file():
return None
samples: list[dict] = []
labels: list[dict] = []
meta: dict | None = None
proc = subprocess.Popen(
["zstd", "-dc", str(path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
with tarfile.open(fileobj=proc.stdout, mode="r|") as tar:
for member in tar:
if not member.isfile():
continue
name = member.name.rsplit("/", 1)[-1]
if name not in ("telemetry-proc.jsonl",
"labels.jsonl",
"meta.json"):
continue
f = tar.extractfile(member)
if f is None:
continue
data = f.read()
if name == "telemetry-proc.jsonl":
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
samples.append(json.loads(line))
except json.JSONDecodeError:
pass
elif name == "labels.jsonl":
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
labels.append(json.loads(line))
except json.JSONDecodeError:
pass
elif name == "meta.json":
try:
meta = json.loads(data)
except json.JSONDecodeError:
pass
finally:
if proc.stdout:
proc.stdout.close()
rc = proc.wait()
if rc != 0:
try:
err = proc.stderr.read().decode("utf-8", errors="replace")
log.warning("zstd %s exit %d: %s", path, rc, err[:200])
except Exception:
pass
if proc.stderr:
proc.stderr.close()
return {
"host_id": host_id,
"episode_id": episode_id,
"samples": samples,
"labels": labels,
"meta": meta,
}
class Broadcaster:
"""Tiny fan-out hub. Per-client async queues, oldest-message-drop
on backpressure. Also holds a ``state`` dict that is sent to every
new client on connect so reconnects don't see a cold dashboard."""
QUEUE_MAX = 64
def __init__(self) -> None:
self._clients: set[asyncio.Queue[Any]] = set()
self._lock = asyncio.Lock()
self.state: dict[str, Any] = {}
async def register(self) -> asyncio.Queue[Any]:
q: asyncio.Queue[Any] = asyncio.Queue(maxsize=self.QUEUE_MAX)
async with self._lock:
self._clients.add(q)
return q
async def unregister(self, q: asyncio.Queue[Any]) -> None:
async with self._lock:
self._clients.discard(q)
async def publish(self, msg: Any) -> int:
delivered = 0
async with self._lock:
clients = list(self._clients)
for q in clients:
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
try: q.get_nowait()
except asyncio.QueueEmpty: pass
try:
q.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
pass
return delivered
@property
def client_count(self) -> int:
return len(self._clients)
broadcaster = Broadcaster()
def make_app(
*,
allow_publish_from: set[str] | None = None,
enable_feeders: bool = True,
data_root: Path = Path("/var/lib/cis490"),
) -> Starlette:
allowed = allow_publish_from if allow_publish_from is not None else {"127.0.0.1", "::1"}
@asynccontextmanager
async def lifespan(app):
tasks: list[asyncio.Task] = []
if enable_feeders:
from .feeder import start_feeders
try:
tasks = start_feeders(broadcaster, data_root=data_root)
log.info("started %d feeders rooted at %s", len(tasks), data_root)
except Exception:
log.exception("failed to start feeders; dashboard will run without them")
try:
yield
finally:
for t in tasks: t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
log.info("dashboard lifespan exiting; %d feeders cancelled", len(tasks))
async def index(request: Request) -> Response:
return FileResponse(STATIC_DIR / "index.html")
async def healthz(request: Request) -> JSONResponse:
return JSONResponse({
"status": "ok",
"clients": broadcaster.client_count,
"state_keys": sorted(broadcaster.state.keys()),
})
async def publish(request: Request) -> JSONResponse:
client_host = request.client.host if request.client else ""
if client_host not in allowed:
return JSONResponse({"error": "forbidden"}, status_code=403)
try:
body = await request.json()
except (ValueError, json.JSONDecodeError):
return JSONResponse({"error": "body must be JSON"}, status_code=400)
delivered = await broadcaster.publish(body)
return JSONResponse({"delivered": delivered})
async def ws_endpoint(ws: WebSocket) -> None:
await ws.accept()
q = await broadcaster.register()
try:
await ws.send_json({"type": "hello", "clients": broadcaster.client_count})
# Send the current snapshot so reconnects start warm.
if broadcaster.state:
await ws.send_json({"type": "snapshot", **broadcaster.state})
while True:
msg = await q.get()
await ws.send_json(msg)
except WebSocketDisconnect:
pass
except Exception:
log.exception("ws send failed")
finally:
await broadcaster.unregister(q)
async def episode(request: Request) -> JSONResponse:
host_id = request.path_params["host_id"]
episode_id = request.path_params["episode_id"]
try:
result = await asyncio.to_thread(
_load_episode_sync, data_root, host_id, episode_id
)
except Exception:
log.exception("episode load failed for %s/%s", host_id, episode_id)
return JSONResponse({"error": "load failed"}, status_code=500)
if result is None:
return JSONResponse({"error": "episode not found"}, status_code=404)
return JSONResponse(result)
routes = [
Route("/", index, methods=["GET"]),
Route("/healthz", healthz, methods=["GET"]),
Route("/publish", publish, methods=["POST"]),
Route("/api/episode/{host_id}/{episode_id}", episode, methods=["GET"]),
WebSocketRoute("/ws", ws_endpoint),
Mount("/static", app=StaticFiles(directory=str(STATIC_DIR)), name="static"),
]
return Starlette(routes=routes, lifespan=lifespan)