CIS490/training/producers/perf.py
Max 697e36a315 training/producers: move out of dashboard/ per ownership boundary
Producers are event *sources* — the renderer is everything inside
training/dashboard/. Sibling layout makes the dependency direction
one-way (producers import from training.dashboard.events; dashboard
never reaches into producers).

  training/dashboard/producers/   →   training/producers/

Internal imports rewritten via sed; eval_/run.py and training/README.md
cross-references updated. CLI entry stays via `python -m training.producers.<sub>`
(replay / metrics / perf / profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:06:56 -05:00

118 lines
4 KiB
Python

"""Emit `model_perf` events — accuracy vs inference latency per model.
Latency is measured at a production-realistic batch size (default 64 —
roughly one second of windows from a few hosts at 0.5s stride). Single-
window timing is reported as `latency_us_b1` for completeness; the
dashboard's scatter widget uses `latency_us`. Republished on a tick
for reconnects.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from training._split import held_out_host
from training.producers._models import (
latency_us_batched, load_models,
)
from training.producers._publish import (
PublishFn, http_publisher, null_publisher,
)
from training.eval_._metrics import _macro_f1
log = logging.getLogger("cis490.dashboard.producers.perf")
async def emit_perf(*, publish: PublishFn, artifacts_dir: Path,
validation_path: Path,
summary_path: Path | None,
tensors_root: Path | None,
batch_for_scatter: int = 64) -> int:
from training.producers.metrics import _build_test_set
models = load_models(artifacts_dir)
if not models:
return 0
n = 0
for m in models:
try:
Xte, yte = _build_test_set(
m, validation_path=validation_path,
summary_path=summary_path, tensors_root=tensors_root,
split_recipe="host", train_hosts=["elliott-thinkpad"],
)
except Exception as e:
log.warning("test set build failed for %s: %s",
m.__model_name__, e)
continue
if len(yte) == 0:
continue
# Sub-sample to bound runtime on perf bench
if Xte.shape[0] > 4096:
Xte = Xte[:4096]; yte = yte[:4096]
y_pred = m.predict(Xte)
acc = _macro_f1(yte, y_pred, m.n_classes)
lat = latency_us_batched(m, Xte,
batch_sizes=(1, 8, 64, 512), n_iter=100)
primary = lat.get(batch_for_scatter, lat.get(min(lat) if lat else 1, 0.0))
log.info("%s acc=%.4f lat[1]=%.1fus lat[64]=%.1fus lat[512]=%.1fus",
m.__model_name__, acc,
lat.get(1, 0), lat.get(64, 0), lat.get(512, 0))
await publish({
"type": "model_perf",
"model": m.__model_name__,
"latency_us": primary,
"accuracy": acc,
"latency_us_by_batch": lat,
})
n += 1
return n
async def _run(args: argparse.Namespace) -> int:
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
cached: list[dict] = []
async def cached_publish(msg: dict) -> None:
cached.append(msg)
await publisher(msg)
await emit_perf(
publish=cached_publish, artifacts_dir=args.artifacts,
validation_path=args.validation,
summary_path=args.summary, tensors_root=args.tensors,
)
if args.interval <= 0 or not cached:
return 0
while True:
await asyncio.sleep(args.interval)
for msg in cached:
await publisher(msg)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--validation", required=True, type=Path)
ap.add_argument("--artifacts", type=Path, default=Path("artifacts"))
ap.add_argument("--summary", type=Path, default=None)
ap.add_argument("--tensors", type=Path, default=None)
ap.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
ap.add_argument("--interval", type=float, default=30.0)
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
return asyncio.run(_run(args))
if __name__ == "__main__":
raise SystemExit(main())