Single import point for the model session to wire interactive
scenes. One @dataclass per event type, with docstrings naming the
scene each one drives and the shape of every field:
PhaseEvent — scene 6 (baseline phase mix)
AttackProfile — scene 7 (per-profile envelope thumbnails)
Prediction — scene 8 (10-second window timeline)
ModelMetric — scene 9 (model accuracy bars)
Embedding — scene 11 (KNN scatter)
ModelPerf — scene 12 (accuracy-vs-latency scatter)
Phase + Model Literal types narrow the inputs so static checkers
+ IDEs autocomplete the canonical strings.
Publisher.publish now accepts either a dataclass instance from
events.py or a plain dict, so the existing
``pub.publish({"type": "...", ...})`` pattern keeps working
untouched.
Module-level publish() / try_publish() helpers wrap a default
Publisher for one-liner usage. The PRODUCERS.md guide now leads
with a pointer to events.py so the typed interface is the first
thing producers read.
9.2 KiB
Producing events for the dashboard
If your code wants to drive a widget on dashboard.wg, this is the
contract you need to honor and the pitfalls to avoid. The dashboard
itself doesn't know anything about models — it just renders typed
events that arrive on the message bus.
Typed Python interface.
events.pydefines a dataclass per event type (PhaseEvent,AttackProfile,Prediction,ModelMetric,Embedding,ModelPerf) with full docstrings about what each one drives, plusPhaseandModelLiteraltypes for autocomplete. Import from there instead of hand-rolling dicts:from training.dashboard.events import Publisher, ModelMetric pub = Publisher() pub.publish(ModelMetric(model="lstm", accuracy=0.928))
The /publish endpoint
POST http://127.0.0.1:8447/publish
content-type: application/json
body: {"type": "...", ...}
Returns {"delivered": N} — the number of currently-connected
browsers that got the message. Returns 403 from anywhere other
than 127.0.0.1 / ::1 (defense in depth) and Caddy's
dashboard.wg site explicitly 404s the path so WG peers can
never reach it. Producers must run on the Pi.
That loopback-only constraint is deliberate: any process on the Pi can publish without auth, and nothing off-host can. If you need a lab host to push events, route them through the receiver pipeline, not this endpoint.
Event types the widgets already subscribe to
Each widget on the deck listens for one or more event types. Match these shapes and the corresponding scene starts updating live.
type |
Body | Drives |
|---|---|---|
phase |
{phase} |
scene 5 — rolling 5-min phase mix |
attack_profile |
{name, shape, curve: [n0, n1, ...]} |
scene 6 — per-profile envelope thumbnails |
prediction |
{episode_id, window_idx: 0-5, predicted, actual} |
scene 7 — 10-second chunking timeline |
model_metric |
{model: "rnn"|"gru"|"lstm"|"bert", accuracy} |
scene 8 — model accuracy bars |
embedding |
{x: 0-1, y: 0-1, phase} |
scene 9 — KNN scatter (one circle per event) |
model_perf |
{model, latency_us, accuracy} |
scene 10 — accuracy vs inference cost |
Phase values are
clean | armed | infecting | infected_running | dormant. The
visualization will accept any other phase string but it'll fall back
to a neutral color and won't appear in the legend.
x / y for embedding events are normalized to [0, 1]. Run
your projector (PCA, UMAP, t-SNE, whatever) and rescale before
publishing.
You're free to introduce new types — they'll flow through unharmed, they just won't drive any existing widget. If you want a new widget, ping the dashboard side.
The reconnect gotcha (read this)
The dashboard publishes a snapshot event to every new browser
connection (and every 30 s thereafter). The snapshot contains
disk-derived state only:
total_episodes, total_alerts, total_bytes, host_counts,
recent_episodes. Anything you publish via /publish is
not part of that snapshot.
Concretely: if you publish model_metric once at startup, only
browsers connected at that moment see it. A browser opened ten
seconds later sees an empty model-bars panel.
Two ways out, pick one:
-
Re-publish on a tick. Cheapest. Every 10–30 s, publish your current state again. Cheap because the bus is in-process and the serializer is small.
-
Ask the dashboard to cache it. If you want server-side persistence so browsers reconnect to a warm view, file a request and we'll add a per-type accumulator + replay-on-connect to the broadcaster. This is intentionally not built yet — wait until you know which event types need it.
Two integration patterns
A. Separate publisher process (recommended for live inference)
Your model code is its own process, packaged however you like. It POSTs events as it runs. No coupling to the dashboard's import graph, separate restart cycle, separate venv if you want.
# training/models/run_inference.py (your file, your jurisdiction)
from training.dashboard.client import Publisher
pub = Publisher() # defaults to http://127.0.0.1:8447/publish
for window_idx, (predicted, actual) in enumerate(predictions):
pub.publish({
"type": "prediction",
"episode_id": episode_id,
"window_idx": window_idx,
"predicted": predicted,
"actual": actual,
})
If you run this as a systemd unit, mind the hardening notes below.
B. In-process (only if your model is light and stateless)
Import the broadcaster directly and await its publish. This is
faster (no HTTP) but couples your code to the dashboard's lifetime
— a bug in your handler crashes the dashboard.
from training.dashboard.app import broadcaster
async def push_metric(model, accuracy):
await broadcaster.publish({"type": "model_metric",
"model": model, "accuracy": accuracy})
Only do this if your code (a) is async-friendly, (b) won't block the
event loop on heavy computation (use asyncio.to_thread for that),
and (c) is genuinely on the dashboard's import path. Otherwise stick
with pattern A.
Browser-triggered runs ("running examples on the Pi")
The WebSocket is currently one-way (server → browser). If you want a Run demo button on the dashboard that triggers your model code on the Pi, you have three options:
-
Polling-style. Your model code wakes periodically, picks the latest episode, runs inference, publishes events. Browser is a pure viewer. Simplest; no UI changes needed on the dashboard side.
-
HTTP endpoint, separate service. Run your model code as
cis490-models.servicelistening on its own port (say8448). Add a Caddy block (or extend thedashboard.wgsnippet) sohttps://dashboard.wg/api/...reverse-proxies to it. Add a button to the dashboard frontend that hits that path. Your service runs the demo and POSTs results back to/publish. -
In-process action registry. If your code lives in the dashboard process, we can add a
/trigger/<action>endpoint and a registry where you callregister_action("run_lstm_demo", fn)at import time. Not built yet — request it if (3) fits.
Pick one and we'll wire the dashboard side to match. (1) is the default — start there.
systemd hardening to watch out for
/etc/systemd/system/cis490-dashboard.service runs as user
cis490 with:
ProtectSystem=strict— entire filesystem mounted read-only except/dev,/proc,/sys, and anyReadWritePaths. There are currently noReadWritePaths. So inside the service, you can READ/var/lib/cis490but you can't write anywhere outside/tmp.ProtectHome=true—/homeis invisible. Don't reference paths there from inside this service.User=cis490— same identity as the receiver, so file permissions for everything under/var/lib/cis490work uniformly.
What this means for you:
- Reading episode tarballs from
/var/lib/cis490/episodes/is fine. Your inference code can open them. - Writing inference caches / model checkpoints needs a writable
path. If you want a cache under
/var/lib/cis490/models/we can addReadWritePaths=/var/lib/cis490to the unit. File a request with the path you need. - Loading model weights from disk works as long as they're
baked into
/opt/cis490(e.g./opt/cis490/training/models/weights/). - The
.venvat/opt/cis490/.venvis shared with the receiver and bootstrap services. New Python deps go inpyproject.tomland someone with write access to/opt/cis490re-runsuv sync(orpip install).
If your code runs as a separate cis490-models.service, copy the
hardening template from etc/cis490-dashboard.service and adjust
ReadWritePaths for your needs.
Worked examples
# Quick smoke test from a Pi shell
curl -s http://127.0.0.1:8447/publish \
-H 'content-type: application/json' \
-d '{"type":"model_metric","model":"lstm","accuracy":0.928}'
# → {"delivered": <N>}
# Re-publishing model state every 20 seconds so reconnects stay warm
import time
from training.dashboard.client import Publisher
pub = Publisher()
state = {"rnn": 0.872, "gru": 0.911, "lstm": 0.928, "bert": 0.954}
while True:
for model, acc in state.items():
pub.publish({"type": "model_metric", "model": model, "accuracy": acc})
time.sleep(20)
# Async producer streaming embeddings as a UMAP runs
import asyncio
from training.dashboard.client import Publisher
async def stream_embeddings(points):
pub = Publisher()
for x, y, phase in points:
await asyncio.to_thread(pub.publish, {
"type": "embedding", "x": x, "y": y, "phase": phase,
})
await asyncio.sleep(0.02) # let the UI breathe