training/dashboard: events.py — typed producer interface

Single import point for the model session to wire interactive
scenes. One @dataclass per event type, with docstrings naming the
scene each one drives and the shape of every field:

    PhaseEvent       — scene 6  (baseline phase mix)
    AttackProfile    — scene 7  (per-profile envelope thumbnails)
    Prediction       — scene 8  (10-second window timeline)
    ModelMetric      — scene 9  (model accuracy bars)
    Embedding        — scene 11 (KNN scatter)
    ModelPerf        — scene 12 (accuracy-vs-latency scatter)

Phase + Model Literal types narrow the inputs so static checkers
+ IDEs autocomplete the canonical strings.

Publisher.publish now accepts either a dataclass instance from
events.py or a plain dict, so the existing
``pub.publish({"type": "...", ...})`` pattern keeps working
untouched.

Module-level publish() / try_publish() helpers wrap a default
Publisher for one-liner usage. The PRODUCERS.md guide now leads
with a pointer to events.py so the typed interface is the first
thing producers read.
This commit is contained in:
Max Gorog 2026-05-08 11:59:03 -05:00
parent 058f2d75a9
commit f303337a1e
3 changed files with 312 additions and 10 deletions

View file

@ -5,6 +5,20 @@ contract you need to honor and the pitfalls to avoid. The dashboard
itself doesn't know anything about models — it just renders typed
events that arrive on the message bus.
> **Typed Python interface.** [`events.py`](./events.py) defines a
> dataclass per event type (`PhaseEvent`, `AttackProfile`,
> `Prediction`, `ModelMetric`, `Embedding`, `ModelPerf`) with full
> docstrings about what each one drives, plus `Phase` and `Model`
> `Literal` types for autocomplete. Import from there instead of
> hand-rolling dicts:
>
> ```python
> from training.dashboard.events import Publisher, ModelMetric
>
> pub = Publisher()
> pub.publish(ModelMetric(model="lstm", accuracy=0.928))
> ```
## The `/publish` endpoint
```

View file

@ -31,13 +31,16 @@ class Publisher:
self.url = url
self.timeout = timeout
def publish(self, msg: dict[str, Any]) -> dict[str, Any]:
"""POST ``msg`` to /publish. Returns the parsed response
body (``{"delivered": N}`` on success). Raises
``urllib.error.HTTPError`` on non-2xx; producers usually
want to log + continue rather than abort, so wrap as
appropriate."""
body = json.dumps(msg).encode("utf-8")
def publish(self, msg: Any) -> dict[str, Any]:
"""POST ``msg`` to /publish. Accepts either a plain ``dict``
or any object with a ``to_event()`` method (the dataclasses
in :mod:`training.dashboard.events` provide one). Returns the
parsed response body (``{"delivered": N}`` on success).
Raises ``urllib.error.HTTPError`` on non-2xx; producers
usually want to log + continue rather than abort, so wrap
as appropriate."""
payload = msg.to_event() if hasattr(msg, "to_event") else msg
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.url, data=body,
headers={"content-type": "application/json"},
@ -46,12 +49,15 @@ class Publisher:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return json.loads(resp.read())
def try_publish(self, msg: dict[str, Any]) -> int:
def try_publish(self, msg: Any) -> int:
"""Like ``publish`` but swallows exceptions and returns the
delivered count (0 on failure). Convenience for producers
that want fire-and-forget semantics."""
that want fire-and-forget semantics. Same accept-either
signature as :meth:`publish`."""
try:
return int(self.publish(msg).get("delivered", 0))
except (urllib.error.URLError, OSError, ValueError):
log.exception("publish to %s failed for %r", self.url, msg.get("type"))
payload = msg.to_event() if hasattr(msg, "to_event") else msg
t = payload.get("type") if isinstance(payload, dict) else None
log.exception("publish to %s failed for %r", self.url, t)
return 0

View file

@ -0,0 +1,282 @@
"""Typed event interface for the dashboard's interactive scenes.
This module is the producer-facing contract for everything the
dashboard frontend can render. Each ``@dataclass`` below is one event
the bus carries; the docstrings spell out which scene (panel) it
drives, what shape it expects, and which UI behavior it produces.
Two ways to publish:
1. **Out-of-process** (typical) your producer code lives in its
own service / script, posts events to ``/publish`` over loopback::
from training.dashboard.events import Publisher, ModelMetric
pub = Publisher()
pub.publish(ModelMetric(model="lstm", accuracy=0.928))
``Publisher.publish`` accepts either a dataclass from this module
or a plain ``dict`` shaped the same way.
2. **In-process** your code is imported into the dashboard's own
uvicorn process. Skip the HTTP round-trip and call the broadcaster
directly::
from training.dashboard.app import broadcaster
from training.dashboard.events import ModelMetric
await broadcaster.publish(ModelMetric("lstm", 0.928).to_event())
Mind the systemd hardening see ``PRODUCERS.md`` for the
read-only-fs / writable-paths story.
Scene event(s) it consumes
============================
========================== ==================================================
Scene (in deck order) Event type(s)
========================== ==================================================
2. stack (static; no events)
3. collect ``snapshot``, ``episode`` (already wired by feeder)
4. hosts ``snapshot``, ``episode`` (already wired by feeder)
5. db ``snapshot`` (already wired by feeder)
6. baseline ``phase`` :class:`PhaseEvent`
7. attacks ``attack_profile`` :class:`AttackProfile`
8. chunking ``prediction`` :class:`Prediction`
9. models ``model_metric`` :class:`ModelMetric`
10. training-code (static; no events)
11. knn ``embedding`` :class:`Embedding`
12. perf ``model_perf`` :class:`ModelPerf`
========================== ==================================================
The reconnect gotcha
====================
Live events go *only* to currently-connected browsers. Reconnects don't
replay them. ``snapshot`` is the only event the dashboard re-sends to
new browsers (sourced from disk by the feeder, not your producer).
If you want a value to stick across reconnects, **republish on a tick**
(every 1030 s is plenty). For any event type where this matters, file
a request and we'll add per-type sticky caching to the broadcaster.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal, Optional, Sequence
from .client import Publisher
# ─────────────────────────────────────────────────────────────────────
# Vocabulary — Literal types so static checkers + IDEs autocomplete.
# Both are intentionally narrow: producers should pick from these
# rather than invent new strings, otherwise the corresponding
# widget can't paint them with the right palette colors.
# ─────────────────────────────────────────────────────────────────────
Phase = Literal[
"clean",
"armed",
"infecting",
"infected_running",
"dormant",
]
Model = Literal[
"rnn",
"gru",
"lstm",
"bert",
"knn",
]
# Base helper so every event has a uniform ``.to_event()`` that drops
# ``None`` fields (so optional values don't pollute the wire payload
# with explicit nulls when the producer doesn't have them).
class _EventBase:
type: str
def to_event(self) -> dict[str, Any]:
out: dict[str, Any] = {"type": self.type}
for k, v in self.__dict__.items():
if v is None:
continue
out[k] = list(v) if isinstance(v, tuple) else v
return out
# ─────────────────────────────────────────────────────────────────────
# Phase — scene 6 (baseline)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class PhaseEvent(_EventBase):
"""One phase transition observed by the orchestrator.
The widget aggregates events into a rolling 5-minute window and
paints the proportion of each phase as a stacked horizontal bar.
Emit one whenever a labelled phase boundary is crossed (or
periodically on a tick if you want to seed the mix from a frozen
label).
:param phase: One of the canonical :data:`Phase` values.
"""
phase: Phase
type: str = field(default="phase", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# AttackProfile — scene 7 (attacks)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class AttackProfile(_EventBase):
"""A canonical envelope thumbnail for one malware profile.
The widget renders a small sparkline per profile, normalised to
its own peak. Send one event per profile on startup; updates
overwrite the prior thumbnail with the same name.
:param name: Profile slug (e.g. ``cpu-saturate``, ``bursty-c2``).
:param curve: Sequence of values; rescaled to 01 internally.
~80 samples renders best at the thumbnail's pixel size.
:param shape: Optional one-line description shown under the name
(e.g. ``sustained 1-vCPU peg``).
"""
name: str
curve: Sequence[float]
shape: str = ""
type: str = field(default="attack_profile", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Prediction — scene 8 (chunking)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class Prediction(_EventBase):
"""A model's per-window prediction inside one episode.
The widget shows a six-cell timeline of 10-second windows; an
event repaints the cell at ``window_idx`` with ``predicted`` (or
``actual`` if predicted is omitted).
:param episode_id: Source episode (informational; multiple
episodes' predictions overwrite the same cells).
:param window_idx: 05; out-of-range values are ignored.
:param predicted: Model's guess for the window-centre phase.
:param actual: Ground-truth phase from ``labels.jsonl``.
"""
episode_id: str
window_idx: int
predicted: Optional[Phase] = None
actual: Optional[Phase] = None
type: str = field(default="prediction", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# ModelMetric — scene 9 (models)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class ModelMetric(_EventBase):
"""Held-out accuracy for one sequence model.
The widget paints one horizontal bar per model name; the most
recent event for a given ``model`` overwrites that bar.
:param model: One of :data:`Model`. Other strings work but won't
get a colored fill class without a CSS update.
:param accuracy: 01. Stretched against a 0.51.0 visible scale
so cluster-around-0.9 differences show clearly.
"""
model: Model
accuracy: float
type: str = field(default="model_metric", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Embedding — scene 11 (knn)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class Embedding(_EventBase):
"""One projected feature vector for the KNN scatter plot.
Each event adds a single dot at ``(x, y)`` colored by ``phase``.
Run your projector (PCA / UMAP / t-SNE) on the per-window engineered
features, rescale x/y to 01, and emit one event per point.
:param x: 01, mapped to the plot's horizontal axis.
:param y: 01, mapped to the plot's vertical axis.
:param phase: Color class picks one of the phase swatches.
"""
x: float
y: float
phase: Phase
type: str = field(default="embedding", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# ModelPerf — scene 12 (perf)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class ModelPerf(_EventBase):
"""One point on the accuracy-vs-inference-cost scatter.
Each model name maps to a single point; subsequent events update
that point's position. The label next to the dot is the model name.
:param model: One of :data:`Model`.
:param latency_us: Inference cost in microseconds per window.
Plotted on the X axis (lower-left is better).
:param accuracy: Held-out accuracy 01, plotted on the Y axis.
"""
model: Model
latency_us: float
accuracy: float
type: str = field(default="model_perf", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Convenience publish helpers — typed wrappers over the HTTP client.
# ─────────────────────────────────────────────────────────────────────
EventLike = _EventBase | dict[str, Any]
def publish(event: EventLike, *, publisher: Optional[Publisher] = None) -> dict[str, Any]:
"""Send one event via a default :class:`Publisher`.
Pass either a dataclass from this module or a plain dict. Returns
the parsed response (e.g. ``{"delivered": 2}``). On HTTP error,
raises ``urllib.error.HTTPError`` wrap as appropriate.
"""
pub = publisher or Publisher()
payload = event.to_event() if isinstance(event, _EventBase) else event
return pub.publish(payload)
def try_publish(event: EventLike, *, publisher: Optional[Publisher] = None) -> int:
"""Fire-and-forget variant of :func:`publish` — swallows errors
and returns the delivered count (0 on failure)."""
pub = publisher or Publisher()
payload = event.to_event() if isinstance(event, _EventBase) else event
return pub.try_publish(payload)
__all__ = [
"Phase",
"Model",
"PhaseEvent",
"AttackProfile",
"Prediction",
"ModelMetric",
"Embedding",
"ModelPerf",
"Publisher",
"publish",
"try_publish",
]