CIS490/training/dashboard/client.py
Max Gorog f303337a1e training/dashboard: events.py — typed producer interface
Single import point for the model session to wire interactive
scenes. One @dataclass per event type, with docstrings naming the
scene each one drives and the shape of every field:

    PhaseEvent       — scene 6  (baseline phase mix)
    AttackProfile    — scene 7  (per-profile envelope thumbnails)
    Prediction       — scene 8  (10-second window timeline)
    ModelMetric      — scene 9  (model accuracy bars)
    Embedding        — scene 11 (KNN scatter)
    ModelPerf        — scene 12 (accuracy-vs-latency scatter)

Phase + Model Literal types narrow the inputs so static checkers
+ IDEs autocomplete the canonical strings.

Publisher.publish now accepts either a dataclass instance from
events.py or a plain dict, so the existing
``pub.publish({"type": "...", ...})`` pattern keeps working
untouched.

Module-level publish() / try_publish() helpers wrap a default
Publisher for one-liner usage. The PRODUCERS.md guide now leads
with a pointer to events.py so the typed interface is the first
thing producers read.
2026-05-08 11:59:03 -05:00

63 lines
2.5 KiB
Python

"""Tiny HTTP client for publishing events to the dashboard.
For producer code outside the dashboard process. See PRODUCERS.md
for the event contract and the loopback-only constraint.
Stdlib-only on purpose so adding a producer doesn't pull a new
dependency into ``pyproject.toml``. Sync API; wrap in
``asyncio.to_thread`` if you're calling from an event loop.
"""
from __future__ import annotations
import json
import logging
import urllib.error
import urllib.request
from typing import Any
log = logging.getLogger("cis490.dashboard.client")
DEFAULT_URL = "http://127.0.0.1:8447/publish"
class Publisher:
"""One-shot publisher. Reuses no connection; the dashboard's
upstream is uvicorn on loopback so the per-call overhead is
sub-millisecond. If you're publishing >100 events/s, switch to
the in-process pattern documented in PRODUCERS.md instead."""
def __init__(self, url: str = DEFAULT_URL, timeout: float = 2.0) -> None:
self.url = url
self.timeout = timeout
def publish(self, msg: Any) -> dict[str, Any]:
"""POST ``msg`` to /publish. Accepts either a plain ``dict``
or any object with a ``to_event()`` method (the dataclasses
in :mod:`training.dashboard.events` provide one). Returns the
parsed response body (``{"delivered": N}`` on success).
Raises ``urllib.error.HTTPError`` on non-2xx; producers
usually want to log + continue rather than abort, so wrap
as appropriate."""
payload = msg.to_event() if hasattr(msg, "to_event") else msg
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.url, data=body,
headers={"content-type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return json.loads(resp.read())
def try_publish(self, msg: Any) -> int:
"""Like ``publish`` but swallows exceptions and returns the
delivered count (0 on failure). Convenience for producers
that want fire-and-forget semantics. Same accept-either
signature as :meth:`publish`."""
try:
return int(self.publish(msg).get("delivered", 0))
except (urllib.error.URLError, OSError, ValueError):
payload = msg.to_event() if hasattr(msg, "to_event") else msg
t = payload.get("type") if isinstance(payload, dict) else None
log.exception("publish to %s failed for %r", self.url, t)
return 0