CIS490/training/producers/_models.py
Max 697e36a315 training/producers: move out of dashboard/ per ownership boundary
Producers are event *sources* — the renderer is everything inside
training/dashboard/. Sibling layout makes the dependency direction
one-way (producers import from training.dashboard.events; dashboard
never reaches into producers).

  training/dashboard/producers/   →   training/producers/

Internal imports rewritten via sed; eval_/run.py and training/README.md
cross-references updated. CLI entry stays via `python -m training.producers.<sub>`
(replay / metrics / perf / profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:06:56 -05:00

103 lines
3.6 KiB
Python

"""Loader + scoring helpers for trained models, dashboard side.
Replaces the original ad-hoc loader. Every checkpoint goes through
``training.models._checkpoint.load_checkpoint`` which verifies the
schema hash matches the live ``_features.py`` registry. If the
training-time schema doesn't match, the loader raises rather than
silently feeding mis-aligned columns to the model — that's the entire
point of the checkpoint format.
Discovery: any ``*.ckpt.json`` under ``artifacts/`` is a candidate.
We sort by ``(name, mode)`` so producers can iterate deterministically.
"""
from __future__ import annotations
import logging
import time
from pathlib import Path
import numpy as np
from training.models import BaseModel
from training.models._checkpoint import load_checkpoint, load_header
log = logging.getLogger("cis490.dashboard.producers._models")
def discover_checkpoints(artifacts_dir: Path) -> list[Path]:
"""All checkpoint JSON paths under artifacts_dir, sorted."""
return sorted(Path(artifacts_dir).glob("*.ckpt.json"))
def load_models(artifacts_dir: Path, *, device: str = "auto"
) -> list[BaseModel]:
"""Load every checkpoint we find. Skips (and logs) any whose schema
hash doesn't match the live registry — a clear signal that the
feature/channel schema changed since training.
"""
models: list[BaseModel] = []
for p in discover_checkpoints(artifacts_dir):
try:
m = load_checkpoint(p, device=device)
models.append(m)
log.info("loaded %s (kind=%s)", p.name, m.input_kind)
except Exception as e:
log.warning("skipping %s: %s", p.name, e)
return models
def model_display_name(m: BaseModel) -> str:
"""For dashboard event payloads. e.g. 'gbt_realistic'."""
name = getattr(m, "__model_name__", "model")
# Mode is in the header, but BaseModel doesn't keep it; pull from class
# via the keep_mask cardinality vs full mask is fragile. Better to
# rely on the JSON header — discover_checkpoints reads it once.
return name
def headers_for(artifacts_dir: Path) -> list[dict]:
return [load_header(p) for p in discover_checkpoints(artifacts_dir)]
def latency_us(model: BaseModel, X_one: np.ndarray, *, n_iter: int = 200,
warmup: int = 20) -> float:
"""Median microseconds per forward pass on a single window.
``X_one`` shape:
- summary: (1, F)
- tensor: (1, C, T)
"""
Xk = model.select(X_one[:1])
# Warm up
for _ in range(warmup):
_ = model.predict_proba(X_one[:1])
samples = []
for _ in range(n_iter):
t0 = time.perf_counter_ns()
_ = model.predict_proba(X_one[:1])
samples.append((time.perf_counter_ns() - t0) / 1000.0)
return float(np.median(samples))
def latency_us_batched(model: BaseModel, X: np.ndarray, *,
batch_sizes: tuple[int, ...] = (1, 8, 64, 512),
n_iter: int = 200, warmup: int = 20
) -> dict[int, float]:
"""Per-batch-size median microseconds. Reports both single-window
(worst case) and production-batch (best case) numbers — single-
window timing is misleading because Python overhead dominates."""
out: dict[int, float] = {}
for bs in batch_sizes:
if bs > X.shape[0]:
continue
Xb = X[:bs]
for _ in range(warmup):
_ = model.predict_proba(Xb)
samples = []
for _ in range(n_iter):
t0 = time.perf_counter_ns()
_ = model.predict_proba(Xb)
samples.append((time.perf_counter_ns() - t0) / 1000.0)
out[bs] = float(np.median(samples))
return out