CIS490/training/dashboard/events.py
Max Gorog 3783fabe86 live scene: per-host swim lanes + latest-detection callout
New scene 13 (between perf and references) for fleet-wide live
predictions. Each host gets a row of recent prediction cells
(capped at 60), painted by predicted phase; mismatch with ground
truth shows a hatched overlay. A callout below the lanes holds
the most recent detection with model, profile, confidence, and
latency.

Producer contract is the new LiveDetection dataclass in events.py.
The dashboard side is producer-agnostic — the inference loop can
run locally or offload to A100 (or any GPU/host); just POST events
back. No rate-limiting needed; the swim-lane DOM does the capping.

Demo synthesizes 5 hosts walking through phases at ~92% accuracy
so the scene reads as live the moment the deck loads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 14:03:32 -05:00

352 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Typed event interface for the dashboard's interactive scenes.
This module is the producer-facing contract for everything the
dashboard frontend can render. Each ``@dataclass`` below is one event
the bus carries; the docstrings spell out which scene (panel) it
drives, what shape it expects, and which UI behavior it produces.
Two ways to publish:
1. **Out-of-process** (typical) — your producer code lives in its
own service / script, posts events to ``/publish`` over loopback::
from training.dashboard.events import Publisher, ModelMetric
pub = Publisher()
pub.publish(ModelMetric(model="lstm", accuracy=0.928))
``Publisher.publish`` accepts either a dataclass from this module
or a plain ``dict`` shaped the same way.
2. **In-process** — your code is imported into the dashboard's own
uvicorn process. Skip the HTTP round-trip and call the broadcaster
directly::
from training.dashboard.app import broadcaster
from training.dashboard.events import ModelMetric
await broadcaster.publish(ModelMetric("lstm", 0.928).to_event())
Mind the systemd hardening — see ``PRODUCERS.md`` for the
read-only-fs / writable-paths story.
Scene → event(s) it consumes
============================
========================== ==================================================
Scene (in deck order) Event type(s)
========================== ==================================================
2. stack (static; no events)
3. collect ``snapshot``, ``episode`` (already wired by feeder)
4. hosts ``snapshot``, ``episode`` (already wired by feeder)
5. db ``snapshot``, ``phase_mix``(already wired by feeder)
6. baseline ``phase_mix`` (feeder; see ``feeder.py``)
7. attacks ``attack_profile`` → :class:`AttackProfile`
8. chunking ``prediction`` → :class:`Prediction`
9. models ``model_metric`` → :class:`ModelMetric`
10. training-code (static; no events)
11. knn ``embedding`` → :class:`Embedding`
12. perf ``model_perf`` → :class:`ModelPerf`
13. live ``live_detection`` → :class:`LiveDetection`
========================== ==================================================
The reconnect gotcha
====================
Live events go *only* to currently-connected browsers. Reconnects don't
replay them. ``snapshot`` is the only event the dashboard re-sends to
new browsers (sourced from disk by the feeder, not your producer).
If you want a value to stick across reconnects, **republish on a tick**
(every 1030 s is plenty). For any event type where this matters, file
a request and we'll add per-type sticky caching to the broadcaster.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal, Optional, Sequence
from .client import Publisher
# ─────────────────────────────────────────────────────────────────────
# Vocabulary — Literal types so static checkers + IDEs autocomplete.
# Both are intentionally narrow: producers should pick from these
# rather than invent new strings, otherwise the corresponding
# widget can't paint them with the right palette colors.
# ─────────────────────────────────────────────────────────────────────
Phase = Literal[
"clean",
"armed",
"infecting",
"infected_running",
"dormant",
]
Model = Literal[
"rnn",
"gru",
"lstm",
"bert",
"knn",
]
# Base helper so every event has a uniform ``.to_event()`` that drops
# ``None`` fields (so optional values don't pollute the wire payload
# with explicit nulls when the producer doesn't have them).
class _EventBase:
type: str
def to_event(self) -> dict[str, Any]:
out: dict[str, Any] = {"type": self.type}
for k, v in self.__dict__.items():
if v is None:
continue
out[k] = list(v) if isinstance(v, tuple) else v
return out
# ─────────────────────────────────────────────────────────────────────
# Phase — scene 6 (baseline)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class PhaseEvent(_EventBase):
"""One phase transition observed by the orchestrator.
The widget aggregates events into a rolling 5-minute window and
paints the proportion of each phase as a stacked horizontal bar.
Emit one whenever a labelled phase boundary is crossed (or
periodically on a tick if you want to seed the mix from a frozen
label).
:param phase: One of the canonical :data:`Phase` values.
"""
phase: Phase
type: str = field(default="phase", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# AttackProfile — scene 7 (attacks)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class AttackProfile(_EventBase):
"""A canonical envelope thumbnail for one malware profile.
The widget renders a small sparkline per profile, normalised to
its own peak. Send one event per profile on startup; updates
overwrite the prior thumbnail with the same name.
:param name: Profile slug (e.g. ``cpu-saturate``, ``bursty-c2``).
:param curve: Sequence of values; rescaled to 01 internally.
~80 samples renders best at the thumbnail's pixel size.
:param shape: Optional one-line description shown under the name
(e.g. ``sustained 1-vCPU peg``).
"""
name: str
curve: Sequence[float]
shape: str = ""
type: str = field(default="attack_profile", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Prediction — scene 8 (chunking)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class Prediction(_EventBase):
"""A model's per-window prediction inside one episode.
The widget shows a six-cell timeline of 10-second windows; an
event repaints the cell at ``window_idx`` with ``predicted`` (or
``actual`` if predicted is omitted).
:param episode_id: Source episode (informational; multiple
episodes' predictions overwrite the same cells).
:param window_idx: 05; out-of-range values are ignored.
:param predicted: Model's guess for the window-centre phase.
:param actual: Ground-truth phase from ``labels.jsonl``.
"""
episode_id: str
window_idx: int
predicted: Optional[Phase] = None
actual: Optional[Phase] = None
type: str = field(default="prediction", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# ModelMetric — scene 9 (models)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class ModelMetric(_EventBase):
"""Held-out accuracy for one sequence model.
The widget paints one horizontal bar per model name; the most
recent event for a given ``model`` overwrites that bar.
:param model: One of :data:`Model`. Other strings work but won't
get a colored fill class without a CSS update.
:param accuracy: 01. Stretched against a 0.51.0 visible scale
so cluster-around-0.9 differences show clearly.
"""
model: Model
accuracy: float
type: str = field(default="model_metric", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Embedding — scene 11 (knn)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class Embedding(_EventBase):
"""One projected feature vector for the KNN 3-D scatter plot.
Each event adds a single dot at ``(x, y, z)``. The scene has a
mode toggle that picks which categorical field colors the dot:
``phase`` (ground truth), ``predicted`` (the KNN/ML output), or
``cluster`` (an unsupervised cluster id).
Run your projector (PCA / UMAP / t-SNE in 3 components) on the
per-window engineered features, rescale each axis to 01, and
emit one event per point.
:param x: 01, mapped to the plot's first projected axis.
:param y: 01, mapped to the plot's second projected axis.
:param z: 01, mapped to the plot's third projected axis.
Optional; omit for a 2-D event (renders at z=0.5).
:param phase: Ground-truth phase. Color when mode = ``phase``.
:param predicted: Model's predicted phase. Color when mode =
``predicted``. Must be a canonical :data:`Phase`
string for the swatches to match.
:param cluster: Unsupervised cluster id (any non-negative int).
Color when mode = ``cluster``; the legend
auto-builds palette swatches per id.
"""
x: float
y: float
z: Optional[float] = None
phase: Optional[Phase] = None
predicted: Optional[Phase] = None
cluster: Optional[int] = None
type: str = field(default="embedding", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# LiveDetection — scene 13 (live)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class LiveDetection(_EventBase):
"""One model inference on a freshly-received window from a fleet host.
Drives the live-detections scene. Each event lights up one cell on
the host's swim lane (cells push in from the right; lane caps at
60) and overwrites the "latest detection" callout below the lanes.
When ``actual`` is also supplied, the cell renders with a hatched
overlay if the model disagrees with ground truth, and the running
hit-rate ticks up on the stats line.
The dashboard side is producer-agnostic — your inference loop can
run locally or offload to an A100 (or any GPU); just POST events
back. Send them as fast as the fleet emits windows, no need to
rate-limit.
:param host_id: Source host. Each unique value gets its own lane.
:param predicted: The model's per-window phase prediction.
:param actual: Optional ground truth from labels.jsonl. Often
arrives later than the prediction — that's fine,
send a follow-up event with both fields filled in
when truth is known.
:param confidence: 01 (softmax max). Optional; shown as a percent
on the latest-detection callout.
:param model: Which model produced this. Drives the "model: …"
badge on the stats line.
:param profile: Malware profile that's running on this host.
Shown on the latest-detection callout.
:param episode_id: Source episode id (used for tooltips, grouping).
:param window_idx: 0-based window index inside the episode.
:param latency_ms: Inference round-trip in ms. If you're offloading
to a remote A100, include the network leg —
that's the operationally honest number.
:param t_wall: Wall-clock seconds since epoch. Optional.
"""
host_id: str
predicted: Phase
actual: Optional[Phase] = None
confidence: Optional[float] = None
model: Optional[Model] = None
profile: Optional[str] = None
episode_id: Optional[str] = None
window_idx: Optional[int] = None
latency_ms: Optional[float] = None
t_wall: Optional[float] = None
type: str = field(default="live_detection", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# ModelPerf — scene 12 (perf)
# ─────────────────────────────────────────────────────────────────────
@dataclass
class ModelPerf(_EventBase):
"""One point on the accuracy-vs-inference-cost scatter.
Each model name maps to a single point; subsequent events update
that point's position. The label next to the dot is the model name.
:param model: One of :data:`Model`.
:param latency_us: Inference cost in microseconds per window.
Plotted on the X axis (lower-left is better).
:param accuracy: Held-out accuracy 01, plotted on the Y axis.
"""
model: Model
latency_us: float
accuracy: float
type: str = field(default="model_perf", init=False, repr=False)
# ─────────────────────────────────────────────────────────────────────
# Convenience publish helpers — typed wrappers over the HTTP client.
# ─────────────────────────────────────────────────────────────────────
EventLike = _EventBase | dict[str, Any]
def publish(event: EventLike, *, publisher: Optional[Publisher] = None) -> dict[str, Any]:
"""Send one event via a default :class:`Publisher`.
Pass either a dataclass from this module or a plain dict. Returns
the parsed response (e.g. ``{"delivered": 2}``). On HTTP error,
raises ``urllib.error.HTTPError`` — wrap as appropriate.
"""
pub = publisher or Publisher()
payload = event.to_event() if isinstance(event, _EventBase) else event
return pub.publish(payload)
def try_publish(event: EventLike, *, publisher: Optional[Publisher] = None) -> int:
"""Fire-and-forget variant of :func:`publish` — swallows errors
and returns the delivered count (0 on failure)."""
pub = publisher or Publisher()
payload = event.to_event() if isinstance(event, _EventBase) else event
return pub.try_publish(payload)
__all__ = [
"Phase",
"Model",
"PhaseEvent",
"AttackProfile",
"Prediction",
"ModelMetric",
"Embedding",
"ModelPerf",
"LiveDetection",
"Publisher",
"publish",
"try_publish",
]