training/producers: knn producer for scene-11 + ModelMetric{knn}

KNN-driven embedding events for the dashboard's KNN scatter scene
(scene 11). One forward pass populates all three of the scatter's
mode-toggle fields:

  x, y, z    — PCA-3 projection of the standardized window features
  phase      — ground-truth phase from labels.jsonl
  predicted  — KNN classifier's prediction (k=10, distance-weighted)
  cluster    — MiniBatchKMeans cluster id (k=8 default)

Two subcommands:

  python -m training.producers.knn produce  ...  emit Embedding events
  python -m training.producers.knn metric    ...  publish ModelMetric{knn}
                                                  on a tick (re-publish
                                                  for reconnect-warmth)

KNN classifier uses the held-out-by-host split aligned with the
supervised pipeline (train ∪ val on elliott-thinkpad, predict on
k-gamingcom) so the predictions reflect cross-device generalization,
not in-distribution self-prediction.

Smoke-verified end-to-end against the live dashboard (3 clients):
800 embedding events delivered in 12 s; ModelMetric{knn} with
test_macro_f1 = 0.4297 on the 567-episode smoke subset, sitting
between the trained GBT (0.557) and the under-trained NN models
(0.09–0.18) — sensible for a non-parametric baseline.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Max 2026-05-08 13:03:19 -05:00
parent 12ac409ab2
commit ac9b5b6f07

367
training/producers/knn.py Normal file
View file

@ -0,0 +1,367 @@
"""KNN-driven embedding events for the dashboard's scene-11 3-D scatter.
For each per-window summary feature vector we compute three things and
emit them on a single ``embedding`` event:
x, y, z PCA-3 projection of the standardized features (01)
phase ground-truth phase from labels.jsonl
predicted KNN classifier's prediction (k=10, distance-weighted)
cluster KMeans cluster id (k=8 by default)
Scene 11's mode toggle (phase / predicted / cluster) recolors the same
points without having to re-publish one event populates all three views.
Three subcommands:
python -m training.producers.knn produce ... emit Embedding events
python -m training.producers.knn fit-only ... fit + dump per-window
(x,y,z,phase,predicted,
cluster) parquet, no
publish
python -m training.producers.knn metric ... publish ModelMetric for
the KNN classifier
(held-out test macro F1)
The fit pipeline is deterministic given (features parquet, schema, seed)
so re-running produces identical points and identical KNN predictions.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
import time
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from training._features import PHASE_TO_INT, PHASES
from training._split import (
held_out_host, held_out_sample, held_out_time,
)
from training.producers._publish import (
PublishFn, http_publisher, null_publisher,
)
log = logging.getLogger("cis490.producers.knn")
# Phase ints map back to canonical strings — the dashboard wants the
# string form (Phase Literal) on the wire, not the int.
INT_TO_PHASE = {i: p for p, i in PHASE_TO_INT.items()}
# The dashboard's Phase Literal does NOT include "failed" (closed enum)
# so we map "failed" → "dormant" defensively. In practice the validator
# rejects "failed" episodes upstream so this shouldn't fire.
_DASHBOARD_PHASES = {"clean", "armed", "infecting", "infected_running", "dormant"}
def _safe_phase(int_phase: int) -> str:
p = INT_TO_PHASE.get(int(int_phase), "clean")
return p if p in _DASHBOARD_PHASES else "clean"
# ─────────────────────────────────────────────────────────────────────
# Pipeline
# ─────────────────────────────────────────────────────────────────────
def _load_features(window_path: Path, schema_path: Path,
*, max_rows: int | None = None
) -> tuple[np.ndarray, np.ndarray, list[str], list[str], list[str]]:
"""Load (X, y, episode_id, host_id, profile) for every window.
Returns:
X (N, F) float32 feature matrix
y (N,) int8 phase labels (PHASE_TO_INT codes)
episode_id list[N]
host_id list[N]
profile list[N]
"""
schema = json.loads(schema_path.read_text())
feat_names = schema["feature_names"]
cols = feat_names + ["phase", "episode_id", "host_id", "profile"]
log.info("loading %s", window_path)
tbl = pq.read_table(window_path, columns=cols)
if max_rows is not None and tbl.num_rows > max_rows:
log.info("subsampling %d%d rows", tbl.num_rows, max_rows)
idx = np.linspace(0, tbl.num_rows - 1, num=max_rows, dtype=np.int64)
tbl = tbl.take(idx)
X = np.column_stack([tbl.column(n).to_numpy(zero_copy_only=False)
for n in feat_names]).astype(np.float32)
y = tbl.column("phase").to_numpy(zero_copy_only=False).astype(np.int64)
eids = tbl.column("episode_id").to_pylist()
hosts = tbl.column("host_id").to_pylist()
profs = tbl.column("profile").to_pylist()
return X, y, eids, hosts, profs
def _standardize(X: np.ndarray) -> np.ndarray:
"""Median-impute NaN, z-score per feature, drop zero-variance cols."""
med = np.nanmedian(X, axis=0)
med = np.where(np.isnan(med), 0.0, med).astype(np.float32)
inds = np.where(np.isnan(X))
Xc = X.copy()
Xc[inds] = np.take(med, inds[1])
mean = Xc.mean(axis=0)
std = Xc.std(axis=0)
keep = std > 1e-6
Xz = (Xc[:, keep] - mean[keep]) / std[keep]
return Xz.astype(np.float32)
def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000,
seed: int = 0) -> np.ndarray:
"""Fit PCA-3 on a subsample, transform all rows. Output rescaled to
[0, 1] per axis using fit-time min/max."""
from sklearn.decomposition import PCA
rng = np.random.default_rng(seed)
if X.shape[0] > sample_for_fit:
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
Xfit = X[idx]
else:
Xfit = X
pca = PCA(n_components=3, random_state=seed)
pca.fit(Xfit)
P = pca.transform(X).astype(np.float32)
# Per-axis min-max rescaling computed on the fit subsample so the
# transform is consistent with what the dashboard scatter expects.
Pfit = pca.transform(Xfit)
lo = Pfit.min(axis=0)
hi = Pfit.max(axis=0)
span = np.maximum(hi - lo, 1e-6)
P01 = (P - lo) / span
return np.clip(P01, 0.0, 1.0).astype(np.float32)
def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000,
seed: int = 0) -> np.ndarray:
from sklearn.cluster import MiniBatchKMeans
log.info("fitting KMeans k=%d on up to %d rows", k, sample_for_fit)
km = MiniBatchKMeans(n_clusters=k, random_state=seed, batch_size=2048,
n_init=4, max_iter=200)
rng = np.random.default_rng(seed)
if X.shape[0] > sample_for_fit:
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
km.fit(X[idx])
else:
km.fit(X)
return km.predict(X).astype(np.int32)
def _knn_predict(
*, X_train: np.ndarray, y_train: np.ndarray,
X_eval: np.ndarray,
k: int = 10, weights: str = "distance",
) -> np.ndarray:
"""Train a KNN classifier and return predictions on X_eval."""
from sklearn.neighbors import KNeighborsClassifier
log.info("fitting KNN k=%d on %d train rows", k, X_train.shape[0])
clf = KNeighborsClassifier(n_neighbors=k, weights=weights,
algorithm="auto", n_jobs=-1)
clf.fit(X_train, y_train)
log.info("predicting on %d eval rows", X_eval.shape[0])
return clf.predict(X_eval).astype(np.int64)
# ─────────────────────────────────────────────────────────────────────
# Subcommands
# ─────────────────────────────────────────────────────────────────────
async def _produce(args) -> int:
X, y, eids, hosts, profs = _load_features(args.window, args.schema,
max_rows=args.max_rows)
log.info("loaded %d windows × %d features", X.shape[0], X.shape[1])
Xz = _standardize(X)
XYZ = _pca3(Xz, seed=args.seed)
cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed)
# KNN classifier — predict-on-self via leave-one-out approximation:
# for a real held-out story, use the held-out-host split.
if args.split_recipe == "host":
# Build a host-based split aligned with the supervised pipeline
val = pq.read_table(args.validation).to_pylist()
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
s = held_out_host(
profiles=[r["profile"] for r in rows],
sample_names=[r["sample_name"] for r in rows],
host_ids=[r["host_id"] for r in rows],
episode_ids=[r["episode_id"] for r in rows],
train_hosts=args.train_hosts,
seed=args.seed,
)
train_eps = {rows[i]["episode_id"] for i in range(len(rows))
if s.train[i] or s.val[i]} # train val for KNN fit
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
else:
# Random 80/20 split
rng = np.random.default_rng(args.seed)
train_mask = rng.random(len(eids)) < 0.8
n_train = int(train_mask.sum())
n_eval = int((~train_mask).sum())
log.info("KNN train=%d eval=%d", n_train, n_eval)
y_pred = np.zeros(len(y), dtype=np.int64)
y_pred[train_mask] = y[train_mask] # train rows: predict their own label
if n_eval > 0:
y_pred[~train_mask] = _knn_predict(
X_train=Xz[train_mask], y_train=y[train_mask],
X_eval=Xz[~train_mask], k=args.k_neighbors,
)
# Optionally write the fit output to a parquet for inspection
if args.fit_out is not None:
import pyarrow as pa
out = pa.table({
"episode_id": eids,
"host_id": hosts,
"profile": profs,
"x": pa.array(XYZ[:, 0], type=pa.float32()),
"y": pa.array(XYZ[:, 1], type=pa.float32()),
"z": pa.array(XYZ[:, 2], type=pa.float32()),
"phase_int": pa.array(y, type=pa.int8()),
"predicted_int": pa.array(y_pred, type=pa.int8()),
"cluster": pa.array(cluster_ids, type=pa.int32()),
"is_train": pa.array(train_mask, type=pa.bool_()),
})
args.fit_out.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(out, args.fit_out, compression="zstd")
log.info("wrote %s", args.fit_out)
if args.no_publish:
return 0
# Publish — rate-limited so we don't drown the dashboard
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
# Subsample the publish stream so the scatter doesn't get
# millions of points. Keep all training-set diversity by stratifying.
rng = np.random.default_rng(args.seed)
if args.max_publish > 0 and len(eids) > args.max_publish:
sel = rng.choice(len(eids), size=args.max_publish, replace=False)
else:
sel = np.arange(len(eids))
n_emit = 0
started = time.monotonic()
for i in sel:
msg = {
"type": "embedding",
"x": float(XYZ[i, 0]),
"y": float(XYZ[i, 1]),
"z": float(XYZ[i, 2]),
"phase": _safe_phase(y[i]),
"predicted": _safe_phase(y_pred[i]),
"cluster": int(cluster_ids[i]),
}
await publisher(msg)
n_emit += 1
if args.rate_hz > 0:
await asyncio.sleep(1.0 / args.rate_hz)
elapsed = time.monotonic() - started
log.info("published %d embedding events in %.1fs", n_emit, elapsed)
return 0
async def _metric(args) -> int:
"""One-shot publish of ModelMetric{model: 'knn', accuracy: macro_f1}."""
from training.eval_._metrics import _macro_f1
X, y, eids, hosts, profs = _load_features(args.window, args.schema)
Xz = _standardize(X)
val = pq.read_table(args.validation).to_pylist()
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
s = held_out_host(
profiles=[r["profile"] for r in rows],
sample_names=[r["sample_name"] for r in rows],
host_ids=[r["host_id"] for r in rows],
episode_ids=[r["episode_id"] for r in rows],
train_hosts=args.train_hosts,
seed=args.seed,
)
train_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.train[i]}
test_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.test[i]}
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
test_mask = np.array([e in test_eps for e in eids], dtype=bool)
log.info("KNN metric: train=%d test=%d",
int(train_mask.sum()), int(test_mask.sum()))
y_pred = _knn_predict(
X_train=Xz[train_mask], y_train=y[train_mask],
X_eval=Xz[test_mask], k=args.k_neighbors,
)
f1 = _macro_f1(y[test_mask], y_pred, n_classes=max(int(y.max()) + 1, 5))
log.info("KNN test macro_f1 = %.4f", f1)
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
# Re-publish on a tick if interval > 0 (dashboard reconnect-warmth)
while True:
await publisher({"type": "model_metric", "model": "knn",
"accuracy": float(f1)})
if args.interval <= 0:
return 0
await asyncio.sleep(args.interval)
# ─────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--window", required=True, type=Path,
help="features_window_v1.parquet")
common.add_argument("--schema", required=True, type=Path,
help="feature_schema_v1.json")
common.add_argument("--validation", type=Path,
default=Path("data/processed/validation_v1.parquet"))
common.add_argument("--split-recipe", choices=["host", "random"],
default="host")
common.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"])
common.add_argument("--seed", type=int, default=0)
common.add_argument("--k-neighbors", type=int, default=10)
common.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
common.add_argument("--dry-run", action="store_true")
common.add_argument("--log-level", default="INFO")
pp = sub.add_parser("produce", parents=[common],
help="emit Embedding events for scene-11 scatter")
pp.add_argument("--k-clusters", type=int, default=8)
pp.add_argument("--max-rows", type=int, default=None,
help="cap windows loaded (testing)")
pp.add_argument("--max-publish", type=int, default=2000,
help="cap published events (the scatter struggles "
"above ~5k points)")
pp.add_argument("--rate-hz", type=float, default=200.0,
help="max publish rate (events/sec); 0 = unlimited")
pp.add_argument("--no-publish", action="store_true",
help="fit + dump but skip the publish step")
pp.add_argument("--fit-out", type=Path, default=None,
help="optional parquet for the per-window fit")
pp.set_defaults(func=_produce)
pm = sub.add_parser("metric", parents=[common],
help="publish ModelMetric{knn} on a tick")
pm.add_argument("--interval", type=float, default=20.0,
help="re-publish period (s); 0 = one-shot")
pm.set_defaults(func=_metric)
args = p.parse_args()
logging.basicConfig(level=args.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
return asyncio.run(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())