Single import point for the model session to wire interactive
scenes. One @dataclass per event type, with docstrings naming the
scene each one drives and the shape of every field:
PhaseEvent — scene 6 (baseline phase mix)
AttackProfile — scene 7 (per-profile envelope thumbnails)
Prediction — scene 8 (10-second window timeline)
ModelMetric — scene 9 (model accuracy bars)
Embedding — scene 11 (KNN scatter)
ModelPerf — scene 12 (accuracy-vs-latency scatter)
Phase + Model Literal types narrow the inputs so static checkers
+ IDEs autocomplete the canonical strings.
Publisher.publish now accepts either a dataclass instance from
events.py or a plain dict, so the existing
``pub.publish({"type": "...", ...})`` pattern keeps working
untouched.
Module-level publish() / try_publish() helpers wrap a default
Publisher for one-liner usage. The PRODUCERS.md guide now leads
with a pointer to events.py so the typed interface is the first
thing producers read.
63 lines
2.5 KiB
Python
63 lines
2.5 KiB
Python
"""Tiny HTTP client for publishing events to the dashboard.
|
|
|
|
For producer code outside the dashboard process. See PRODUCERS.md
|
|
for the event contract and the loopback-only constraint.
|
|
|
|
Stdlib-only on purpose so adding a producer doesn't pull a new
|
|
dependency into ``pyproject.toml``. Sync API; wrap in
|
|
``asyncio.to_thread`` if you're calling from an event loop.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
log = logging.getLogger("cis490.dashboard.client")
|
|
|
|
DEFAULT_URL = "http://127.0.0.1:8447/publish"
|
|
|
|
|
|
class Publisher:
|
|
"""One-shot publisher. Reuses no connection; the dashboard's
|
|
upstream is uvicorn on loopback so the per-call overhead is
|
|
sub-millisecond. If you're publishing >100 events/s, switch to
|
|
the in-process pattern documented in PRODUCERS.md instead."""
|
|
|
|
def __init__(self, url: str = DEFAULT_URL, timeout: float = 2.0) -> None:
|
|
self.url = url
|
|
self.timeout = timeout
|
|
|
|
def publish(self, msg: Any) -> dict[str, Any]:
|
|
"""POST ``msg`` to /publish. Accepts either a plain ``dict``
|
|
or any object with a ``to_event()`` method (the dataclasses
|
|
in :mod:`training.dashboard.events` provide one). Returns the
|
|
parsed response body (``{"delivered": N}`` on success).
|
|
Raises ``urllib.error.HTTPError`` on non-2xx; producers
|
|
usually want to log + continue rather than abort, so wrap
|
|
as appropriate."""
|
|
payload = msg.to_event() if hasattr(msg, "to_event") else msg
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
self.url, data=body,
|
|
headers={"content-type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
def try_publish(self, msg: Any) -> int:
|
|
"""Like ``publish`` but swallows exceptions and returns the
|
|
delivered count (0 on failure). Convenience for producers
|
|
that want fire-and-forget semantics. Same accept-either
|
|
signature as :meth:`publish`."""
|
|
try:
|
|
return int(self.publish(msg).get("delivered", 0))
|
|
except (urllib.error.URLError, OSError, ValueError):
|
|
payload = msg.to_event() if hasattr(msg, "to_event") else msg
|
|
t = payload.get("type") if isinstance(payload, dict) else None
|
|
log.exception("publish to %s failed for %r", self.url, t)
|
|
return 0
|