CIS490/training/producers/_publish.py
Max 697e36a315 training/producers: move out of dashboard/ per ownership boundary
Producers are event *sources* — the renderer is everything inside
training/dashboard/. Sibling layout makes the dependency direction
one-way (producers import from training.dashboard.events; dashboard
never reaches into producers).

  training/dashboard/producers/   →   training/producers/

Internal imports rewritten via sed; eval_/run.py and training/README.md
cross-references updated. CLI entry stays via `python -m training.producers.<sub>`
(replay / metrics / perf / profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:06:56 -05:00

53 lines
1.6 KiB
Python

"""Transport-agnostic publish callable for dashboard producers.
Two flavors, both returning ``async def publish(msg) -> None``:
- ``http_publisher(url)`` — wraps the canonical
``training.dashboard.client.Publisher`` (stdlib-only urllib). Use
for separate-process producers (the recommended pattern in
PRODUCERS.md). Errors are swallowed via ``try_publish`` — a
momentarily dead dashboard should not kill a long-running producer.
- ``local_publisher()`` — in-process. Awaits
``training.dashboard.app.broadcaster.publish`` directly. Only use
when your code is genuinely on the dashboard's import path and
doesn't block the event loop.
- ``null_publisher()`` — no-op for unit tests.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Awaitable, Callable
log = logging.getLogger("cis490.dashboard.producers")
PublishFn = Callable[[dict[str, Any]], Awaitable[None]]
def local_publisher() -> PublishFn:
from training.dashboard.app import broadcaster
async def publish(msg: dict[str, Any]) -> None:
await broadcaster.publish(msg)
return publish
def http_publisher(url: str = "http://127.0.0.1:8447/publish",
timeout_s: float = 2.0) -> PublishFn:
from training.dashboard.client import Publisher
pub = Publisher(url=url, timeout=timeout_s)
async def publish(msg: dict[str, Any]) -> None:
# try_publish swallows errors and returns 0 on failure.
await asyncio.to_thread(pub.try_publish, msg)
return publish
def null_publisher() -> PublishFn:
async def publish(_msg: dict[str, Any]) -> None:
return None
return publish