CIS490/training/producers/knn.py
Max ba5ff70c14 training/producers/knn: add stream subcommand for disk-loaded loop
The fit pipeline (PCA-3 + KMeans + KNN classifier) can be expensive
to recompute every time a producer starts. `produce --fit-out` already
dumps the per-window (x, y, z, phase_int, predicted_int, cluster) to a
parquet; this commit adds a `stream` subcommand that loads that
parquet and publishes Embedding events on a loop.

Why a separate streamer:
  - The dashboard's live event stream is not replayed on browser
    reconnect (PRODUCERS.md §reconnect-gotcha). A browser that
    connects 30 s after the last cycle of the producer sees an empty
    scatter unless we re-publish.
  - The fit is deterministic given (features, seed) — no need to
    repeat it just to re-publish points. The streamer is small and
    stateless; it can run as a long-lived service.

Usage:
  python -m training.producers.knn produce \\
      --window data/processed/features_window_v1.parquet \\
      --schema data/processed/feature_schema_v1.json \\
      --fit-out data/processed/knn_v1.parquet \\
      --no-publish

  python -m training.producers.knn stream \\
      --load-fit data/processed/knn_v1.parquet \\
      --loop --max-points 2000

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 13:13:09 -05:00

456 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""KNN-driven embedding events for the dashboard's scene-11 3-D scatter.
For each per-window summary feature vector we compute three things and
emit them on a single ``embedding`` event:
x, y, z ← PCA-3 projection of the standardized features (01)
phase ← ground-truth phase from labels.jsonl
predicted ← KNN classifier's prediction (k=10, distance-weighted)
cluster ← KMeans cluster id (k=8 by default)
Scene 11's mode toggle (phase / predicted / cluster) recolors the same
points without having to re-publish — one event populates all three views.
Three subcommands:
python -m training.producers.knn produce ... emit Embedding events
python -m training.producers.knn fit-only ... fit + dump per-window
(x,y,z,phase,predicted,
cluster) parquet, no
publish
python -m training.producers.knn metric ... publish ModelMetric for
the KNN classifier
(held-out test macro F1)
The fit pipeline is deterministic given (features parquet, schema, seed)
so re-running produces identical points and identical KNN predictions.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from training._features import PHASE_TO_INT, PHASES
from training._split import (
held_out_host, held_out_sample, held_out_time,
)
from training.producers._publish import (
PublishFn, http_publisher, null_publisher,
)
log = logging.getLogger("cis490.producers.knn")
# Phase ints map back to canonical strings — the dashboard wants the
# string form (Phase Literal) on the wire, not the int.
INT_TO_PHASE = {i: p for p, i in PHASE_TO_INT.items()}
# The dashboard's Phase Literal does NOT include "failed" (closed enum)
# so we map "failed" → "dormant" defensively. In practice the validator
# rejects "failed" episodes upstream so this shouldn't fire.
_DASHBOARD_PHASES = {"clean", "armed", "infecting", "infected_running", "dormant"}
def _safe_phase(int_phase: int) -> str:
p = INT_TO_PHASE.get(int(int_phase), "clean")
return p if p in _DASHBOARD_PHASES else "clean"
# ─────────────────────────────────────────────────────────────────────
# Pipeline
# ─────────────────────────────────────────────────────────────────────
def _load_features(window_path: Path, schema_path: Path,
*, max_rows: int | None = None
) -> tuple[np.ndarray, np.ndarray, list[str], list[str], list[str]]:
"""Load (X, y, episode_id, host_id, profile) for every window.
Returns:
X (N, F) float32 — feature matrix
y (N,) int8 — phase labels (PHASE_TO_INT codes)
episode_id list[N]
host_id list[N]
profile list[N]
"""
schema = json.loads(schema_path.read_text())
feat_names = schema["feature_names"]
cols = feat_names + ["phase", "episode_id", "host_id", "profile"]
log.info("loading %s", window_path)
tbl = pq.read_table(window_path, columns=cols)
if max_rows is not None and tbl.num_rows > max_rows:
log.info("subsampling %d%d rows", tbl.num_rows, max_rows)
idx = np.linspace(0, tbl.num_rows - 1, num=max_rows, dtype=np.int64)
tbl = tbl.take(idx)
X = np.column_stack([tbl.column(n).to_numpy(zero_copy_only=False)
for n in feat_names]).astype(np.float32)
y = tbl.column("phase").to_numpy(zero_copy_only=False).astype(np.int64)
eids = tbl.column("episode_id").to_pylist()
hosts = tbl.column("host_id").to_pylist()
profs = tbl.column("profile").to_pylist()
return X, y, eids, hosts, profs
def _standardize(X: np.ndarray) -> np.ndarray:
"""Median-impute NaN, z-score per feature, drop zero-variance cols."""
med = np.nanmedian(X, axis=0)
med = np.where(np.isnan(med), 0.0, med).astype(np.float32)
inds = np.where(np.isnan(X))
Xc = X.copy()
Xc[inds] = np.take(med, inds[1])
mean = Xc.mean(axis=0)
std = Xc.std(axis=0)
keep = std > 1e-6
Xz = (Xc[:, keep] - mean[keep]) / std[keep]
return Xz.astype(np.float32)
def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000,
seed: int = 0) -> np.ndarray:
"""Fit PCA-3 on a subsample, transform all rows. Output rescaled to
[0, 1] per axis using fit-time min/max."""
from sklearn.decomposition import PCA
rng = np.random.default_rng(seed)
if X.shape[0] > sample_for_fit:
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
Xfit = X[idx]
else:
Xfit = X
pca = PCA(n_components=3, random_state=seed)
pca.fit(Xfit)
P = pca.transform(X).astype(np.float32)
# Per-axis min-max rescaling computed on the fit subsample so the
# transform is consistent with what the dashboard scatter expects.
Pfit = pca.transform(Xfit)
lo = Pfit.min(axis=0)
hi = Pfit.max(axis=0)
span = np.maximum(hi - lo, 1e-6)
P01 = (P - lo) / span
return np.clip(P01, 0.0, 1.0).astype(np.float32)
def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000,
seed: int = 0) -> np.ndarray:
from sklearn.cluster import MiniBatchKMeans
log.info("fitting KMeans k=%d on up to %d rows", k, sample_for_fit)
km = MiniBatchKMeans(n_clusters=k, random_state=seed, batch_size=2048,
n_init=4, max_iter=200)
rng = np.random.default_rng(seed)
if X.shape[0] > sample_for_fit:
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
km.fit(X[idx])
else:
km.fit(X)
return km.predict(X).astype(np.int32)
def _knn_predict(
*, X_train: np.ndarray, y_train: np.ndarray,
X_eval: np.ndarray,
k: int = 10, weights: str = "distance",
) -> np.ndarray:
"""Train a KNN classifier and return predictions on X_eval."""
from sklearn.neighbors import KNeighborsClassifier
log.info("fitting KNN k=%d on %d train rows", k, X_train.shape[0])
clf = KNeighborsClassifier(n_neighbors=k, weights=weights,
algorithm="auto", n_jobs=-1)
clf.fit(X_train, y_train)
log.info("predicting on %d eval rows", X_eval.shape[0])
return clf.predict(X_eval).astype(np.int64)
# ─────────────────────────────────────────────────────────────────────
# Subcommands
# ─────────────────────────────────────────────────────────────────────
async def _produce(args) -> int:
X, y, eids, hosts, profs = _load_features(args.window, args.schema,
max_rows=args.max_rows)
log.info("loaded %d windows × %d features", X.shape[0], X.shape[1])
Xz = _standardize(X)
XYZ = _pca3(Xz, seed=args.seed)
cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed)
# KNN classifier — predict-on-self via leave-one-out approximation:
# for a real held-out story, use the held-out-host split.
if args.split_recipe == "host":
# Build a host-based split aligned with the supervised pipeline
val = pq.read_table(args.validation).to_pylist()
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
s = held_out_host(
profiles=[r["profile"] for r in rows],
sample_names=[r["sample_name"] for r in rows],
host_ids=[r["host_id"] for r in rows],
episode_ids=[r["episode_id"] for r in rows],
train_hosts=args.train_hosts,
seed=args.seed,
)
train_eps = {rows[i]["episode_id"] for i in range(len(rows))
if s.train[i] or s.val[i]} # train val for KNN fit
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
else:
# Random 80/20 split
rng = np.random.default_rng(args.seed)
train_mask = rng.random(len(eids)) < 0.8
n_train = int(train_mask.sum())
n_eval = int((~train_mask).sum())
log.info("KNN train=%d eval=%d", n_train, n_eval)
y_pred = np.zeros(len(y), dtype=np.int64)
y_pred[train_mask] = y[train_mask] # train rows: predict their own label
if n_eval > 0:
y_pred[~train_mask] = _knn_predict(
X_train=Xz[train_mask], y_train=y[train_mask],
X_eval=Xz[~train_mask], k=args.k_neighbors,
)
# Optionally write the fit output to a parquet for inspection
if args.fit_out is not None:
import pyarrow as pa
out = pa.table({
"episode_id": eids,
"host_id": hosts,
"profile": profs,
"x": pa.array(XYZ[:, 0], type=pa.float32()),
"y": pa.array(XYZ[:, 1], type=pa.float32()),
"z": pa.array(XYZ[:, 2], type=pa.float32()),
"phase_int": pa.array(y, type=pa.int8()),
"predicted_int": pa.array(y_pred, type=pa.int8()),
"cluster": pa.array(cluster_ids, type=pa.int32()),
"is_train": pa.array(train_mask, type=pa.bool_()),
})
args.fit_out.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(out, args.fit_out, compression="zstd")
log.info("wrote %s", args.fit_out)
if args.no_publish:
return 0
# Publish — rate-limited so we don't drown the dashboard
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
# Subsample the publish stream so the scatter doesn't get
# millions of points. Keep all training-set diversity by stratifying.
rng = np.random.default_rng(args.seed)
if args.max_publish > 0 and len(eids) > args.max_publish:
sel = rng.choice(len(eids), size=args.max_publish, replace=False)
else:
sel = np.arange(len(eids))
n_emit = 0
started = time.monotonic()
for i in sel:
msg = {
"type": "embedding",
"x": float(XYZ[i, 0]),
"y": float(XYZ[i, 1]),
"z": float(XYZ[i, 2]),
"phase": _safe_phase(y[i]),
"predicted": _safe_phase(y_pred[i]),
"cluster": int(cluster_ids[i]),
}
await publisher(msg)
n_emit += 1
if args.rate_hz > 0:
await asyncio.sleep(1.0 / args.rate_hz)
elapsed = time.monotonic() - started
log.info("published %d embedding events in %.1fs", n_emit, elapsed)
return 0
async def _stream(args) -> int:
"""Load a previously-saved knn_v1.parquet and publish embedding
events from it in a loop. The fit is already on disk so this
process is small and stateless — runs as a long-lived service or
one-shot.
Why a separate streamer (not just `produce --loop`):
- `produce` re-fits PCA/KMeans/KNN every invocation, which is
expensive and unnecessary if we already have a saved fit.
- The stream loop re-publishes the same points so a browser
that reconnects 30 s after the last cycle still sees a
populated scatter (the dashboard doesn't replay live events
on reconnect — see PRODUCERS.md §reconnect-gotcha).
"""
from pyarrow import parquet as _pq
if not args.load_fit.exists():
log.error("fit parquet not found: %s — run 'knn produce --fit-out' first",
args.load_fit)
return 1
log.info("streaming from %s", args.load_fit)
tbl = _pq.read_table(args.load_fit)
n = tbl.num_rows
log.info("loaded %d points", n)
# Subsample once at startup so each cycle pushes the same points
# (deterministic given seed) and the scatter stays coherent.
rng = np.random.default_rng(args.seed)
if args.max_points > 0 and n > args.max_points:
sel = rng.choice(n, size=args.max_points, replace=False)
sel.sort() # ordered emission feels less random
else:
sel = np.arange(n)
cols = {c: tbl.column(c).to_numpy(zero_copy_only=False) for c in tbl.column_names}
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
cycle = 0
while True:
started = time.monotonic()
n_emit = 0
for i in sel:
phase_int = int(cols["phase_int"][i])
pred_int = int(cols["predicted_int"][i])
msg = {
"type": "embedding",
"x": float(cols["x"][i]),
"y": float(cols["y"][i]),
"z": float(cols["z"][i]),
"phase": _safe_phase(phase_int),
"predicted": _safe_phase(pred_int),
"cluster": int(cols["cluster"][i]),
}
await publisher(msg)
n_emit += 1
if args.rate_hz > 0:
await asyncio.sleep(1.0 / args.rate_hz)
cycle += 1
elapsed = time.monotonic() - started
log.info("cycle %d: published %d embeddings in %.1fs", cycle, n_emit, elapsed)
if not args.loop:
return 0
# Pause between cycles so we're not pegging the dashboard at 100%
await asyncio.sleep(args.cycle_pause_s)
async def _metric(args) -> int:
"""One-shot publish of ModelMetric{model: 'knn', accuracy: macro_f1}."""
from training.eval_._metrics import _macro_f1
X, y, eids, hosts, profs = _load_features(args.window, args.schema)
Xz = _standardize(X)
val = pq.read_table(args.validation).to_pylist()
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
s = held_out_host(
profiles=[r["profile"] for r in rows],
sample_names=[r["sample_name"] for r in rows],
host_ids=[r["host_id"] for r in rows],
episode_ids=[r["episode_id"] for r in rows],
train_hosts=args.train_hosts,
seed=args.seed,
)
train_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.train[i]}
test_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.test[i]}
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
test_mask = np.array([e in test_eps for e in eids], dtype=bool)
log.info("KNN metric: train=%d test=%d",
int(train_mask.sum()), int(test_mask.sum()))
y_pred = _knn_predict(
X_train=Xz[train_mask], y_train=y[train_mask],
X_eval=Xz[test_mask], k=args.k_neighbors,
)
f1 = _macro_f1(y[test_mask], y_pred, n_classes=max(int(y.max()) + 1, 5))
log.info("KNN test macro_f1 = %.4f", f1)
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
# Re-publish on a tick if interval > 0 (dashboard reconnect-warmth)
while True:
await publisher({"type": "model_metric", "model": "knn",
"accuracy": float(f1)})
if args.interval <= 0:
return 0
await asyncio.sleep(args.interval)
# ─────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--window", required=True, type=Path,
help="features_window_v1.parquet")
common.add_argument("--schema", required=True, type=Path,
help="feature_schema_v1.json")
common.add_argument("--validation", type=Path,
default=Path("data/processed/validation_v1.parquet"))
common.add_argument("--split-recipe", choices=["host", "random"],
default="host")
common.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"])
common.add_argument("--seed", type=int, default=0)
common.add_argument("--k-neighbors", type=int, default=10)
common.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
common.add_argument("--dry-run", action="store_true")
common.add_argument("--log-level", default="INFO")
pp = sub.add_parser("produce", parents=[common],
help="emit Embedding events for scene-11 scatter")
pp.add_argument("--k-clusters", type=int, default=8)
pp.add_argument("--max-rows", type=int, default=None,
help="cap windows loaded (testing)")
pp.add_argument("--max-publish", type=int, default=2000,
help="cap published events (the scatter struggles "
"above ~5k points)")
pp.add_argument("--rate-hz", type=float, default=200.0,
help="max publish rate (events/sec); 0 = unlimited")
pp.add_argument("--no-publish", action="store_true",
help="fit + dump but skip the publish step")
pp.add_argument("--fit-out", type=Path, default=None,
help="optional parquet for the per-window fit")
pp.set_defaults(func=_produce)
pm = sub.add_parser("metric", parents=[common],
help="publish ModelMetric{knn} on a tick")
pm.add_argument("--interval", type=float, default=20.0,
help="re-publish period (s); 0 = one-shot")
pm.set_defaults(func=_metric)
# ── stream subcommand: replay a saved fit parquet on a loop ──────
ps = sub.add_parser("stream",
help="publish Embedding events from a saved "
"fit parquet (no re-fit)")
ps.add_argument("--load-fit", required=True, type=Path,
help="path to a parquet from `knn produce --fit-out`")
ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
ps.add_argument("--max-points", type=int, default=2000,
help="cap published points per cycle (default 2000); "
"0 = all rows")
ps.add_argument("--rate-hz", type=float, default=200.0,
help="publish rate cap (events/sec); 0 = unlimited")
ps.add_argument("--loop", action="store_true",
help="cycle indefinitely so reconnecting browsers "
"stay populated")
ps.add_argument("--cycle-pause-s", type=float, default=15.0,
help="pause between cycles when --loop is set")
ps.add_argument("--seed", type=int, default=0)
ps.add_argument("--dry-run", action="store_true")
ps.add_argument("--log-level", default="INFO")
ps.set_defaults(func=_stream)
args = p.parse_args()
logging.basicConfig(level=args.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
return asyncio.run(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())