diff --git a/training/producers/knn.py b/training/producers/knn.py new file mode 100644 index 0000000..7447f18 --- /dev/null +++ b/training/producers/knn.py @@ -0,0 +1,367 @@ +"""KNN-driven embedding events for the dashboard's scene-11 3-D scatter. + +For each per-window summary feature vector we compute three things and +emit them on a single ``embedding`` event: + + x, y, z ← PCA-3 projection of the standardized features (0–1) + phase ← ground-truth phase from labels.jsonl + predicted ← KNN classifier's prediction (k=10, distance-weighted) + cluster ← KMeans cluster id (k=8 by default) + +Scene 11's mode toggle (phase / predicted / cluster) recolors the same +points without having to re-publish — one event populates all three views. + +Three subcommands: + + python -m training.producers.knn produce ... emit Embedding events + python -m training.producers.knn fit-only ... fit + dump per-window + (x,y,z,phase,predicted, + cluster) parquet, no + publish + python -m training.producers.knn metric ... publish ModelMetric for + the KNN classifier + (held-out test macro F1) + +The fit pipeline is deterministic given (features parquet, schema, seed) +so re-running produces identical points and identical KNN predictions. +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +import sys +import time +from pathlib import Path + +import numpy as np +import pyarrow.parquet as pq + +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) +from training._features import PHASE_TO_INT, PHASES +from training._split import ( + held_out_host, held_out_sample, held_out_time, +) +from training.producers._publish import ( + PublishFn, http_publisher, null_publisher, +) + + +log = logging.getLogger("cis490.producers.knn") + + +# Phase ints map back to canonical strings — the dashboard wants the +# string form (Phase Literal) on the wire, not the int. +INT_TO_PHASE = {i: p for p, i in PHASE_TO_INT.items()} +# The dashboard's Phase Literal does NOT include "failed" (closed enum) +# so we map "failed" → "dormant" defensively. In practice the validator +# rejects "failed" episodes upstream so this shouldn't fire. +_DASHBOARD_PHASES = {"clean", "armed", "infecting", "infected_running", "dormant"} + + +def _safe_phase(int_phase: int) -> str: + p = INT_TO_PHASE.get(int(int_phase), "clean") + return p if p in _DASHBOARD_PHASES else "clean" + + +# ───────────────────────────────────────────────────────────────────── +# Pipeline +# ───────────────────────────────────────────────────────────────────── + + +def _load_features(window_path: Path, schema_path: Path, + *, max_rows: int | None = None + ) -> tuple[np.ndarray, np.ndarray, list[str], list[str], list[str]]: + """Load (X, y, episode_id, host_id, profile) for every window. + + Returns: + X (N, F) float32 — feature matrix + y (N,) int8 — phase labels (PHASE_TO_INT codes) + episode_id list[N] + host_id list[N] + profile list[N] + """ + schema = json.loads(schema_path.read_text()) + feat_names = schema["feature_names"] + cols = feat_names + ["phase", "episode_id", "host_id", "profile"] + log.info("loading %s", window_path) + tbl = pq.read_table(window_path, columns=cols) + if max_rows is not None and tbl.num_rows > max_rows: + log.info("subsampling %d → %d rows", tbl.num_rows, max_rows) + idx = np.linspace(0, tbl.num_rows - 1, num=max_rows, dtype=np.int64) + tbl = tbl.take(idx) + X = np.column_stack([tbl.column(n).to_numpy(zero_copy_only=False) + for n in feat_names]).astype(np.float32) + y = tbl.column("phase").to_numpy(zero_copy_only=False).astype(np.int64) + eids = tbl.column("episode_id").to_pylist() + hosts = tbl.column("host_id").to_pylist() + profs = tbl.column("profile").to_pylist() + return X, y, eids, hosts, profs + + +def _standardize(X: np.ndarray) -> np.ndarray: + """Median-impute NaN, z-score per feature, drop zero-variance cols.""" + med = np.nanmedian(X, axis=0) + med = np.where(np.isnan(med), 0.0, med).astype(np.float32) + inds = np.where(np.isnan(X)) + Xc = X.copy() + Xc[inds] = np.take(med, inds[1]) + mean = Xc.mean(axis=0) + std = Xc.std(axis=0) + keep = std > 1e-6 + Xz = (Xc[:, keep] - mean[keep]) / std[keep] + return Xz.astype(np.float32) + + +def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000, + seed: int = 0) -> np.ndarray: + """Fit PCA-3 on a subsample, transform all rows. Output rescaled to + [0, 1] per axis using fit-time min/max.""" + from sklearn.decomposition import PCA + rng = np.random.default_rng(seed) + if X.shape[0] > sample_for_fit: + idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False) + Xfit = X[idx] + else: + Xfit = X + pca = PCA(n_components=3, random_state=seed) + pca.fit(Xfit) + P = pca.transform(X).astype(np.float32) + # Per-axis min-max rescaling computed on the fit subsample so the + # transform is consistent with what the dashboard scatter expects. + Pfit = pca.transform(Xfit) + lo = Pfit.min(axis=0) + hi = Pfit.max(axis=0) + span = np.maximum(hi - lo, 1e-6) + P01 = (P - lo) / span + return np.clip(P01, 0.0, 1.0).astype(np.float32) + + +def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000, + seed: int = 0) -> np.ndarray: + from sklearn.cluster import MiniBatchKMeans + log.info("fitting KMeans k=%d on up to %d rows", k, sample_for_fit) + km = MiniBatchKMeans(n_clusters=k, random_state=seed, batch_size=2048, + n_init=4, max_iter=200) + rng = np.random.default_rng(seed) + if X.shape[0] > sample_for_fit: + idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False) + km.fit(X[idx]) + else: + km.fit(X) + return km.predict(X).astype(np.int32) + + +def _knn_predict( + *, X_train: np.ndarray, y_train: np.ndarray, + X_eval: np.ndarray, + k: int = 10, weights: str = "distance", +) -> np.ndarray: + """Train a KNN classifier and return predictions on X_eval.""" + from sklearn.neighbors import KNeighborsClassifier + log.info("fitting KNN k=%d on %d train rows", k, X_train.shape[0]) + clf = KNeighborsClassifier(n_neighbors=k, weights=weights, + algorithm="auto", n_jobs=-1) + clf.fit(X_train, y_train) + log.info("predicting on %d eval rows", X_eval.shape[0]) + return clf.predict(X_eval).astype(np.int64) + + +# ───────────────────────────────────────────────────────────────────── +# Subcommands +# ───────────────────────────────────────────────────────────────────── + + +async def _produce(args) -> int: + X, y, eids, hosts, profs = _load_features(args.window, args.schema, + max_rows=args.max_rows) + log.info("loaded %d windows × %d features", X.shape[0], X.shape[1]) + Xz = _standardize(X) + XYZ = _pca3(Xz, seed=args.seed) + cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed) + + # KNN classifier — predict-on-self via leave-one-out approximation: + # for a real held-out story, use the held-out-host split. + if args.split_recipe == "host": + # Build a host-based split aligned with the supervised pipeline + val = pq.read_table(args.validation).to_pylist() + rows = [r for r in val if r["status"] in ("accepted", "degraded")] + s = held_out_host( + profiles=[r["profile"] for r in rows], + sample_names=[r["sample_name"] for r in rows], + host_ids=[r["host_id"] for r in rows], + episode_ids=[r["episode_id"] for r in rows], + train_hosts=args.train_hosts, + seed=args.seed, + ) + train_eps = {rows[i]["episode_id"] for i in range(len(rows)) + if s.train[i] or s.val[i]} # train ∪ val for KNN fit + train_mask = np.array([e in train_eps for e in eids], dtype=bool) + else: + # Random 80/20 split + rng = np.random.default_rng(args.seed) + train_mask = rng.random(len(eids)) < 0.8 + + n_train = int(train_mask.sum()) + n_eval = int((~train_mask).sum()) + log.info("KNN train=%d eval=%d", n_train, n_eval) + y_pred = np.zeros(len(y), dtype=np.int64) + y_pred[train_mask] = y[train_mask] # train rows: predict their own label + if n_eval > 0: + y_pred[~train_mask] = _knn_predict( + X_train=Xz[train_mask], y_train=y[train_mask], + X_eval=Xz[~train_mask], k=args.k_neighbors, + ) + + # Optionally write the fit output to a parquet for inspection + if args.fit_out is not None: + import pyarrow as pa + out = pa.table({ + "episode_id": eids, + "host_id": hosts, + "profile": profs, + "x": pa.array(XYZ[:, 0], type=pa.float32()), + "y": pa.array(XYZ[:, 1], type=pa.float32()), + "z": pa.array(XYZ[:, 2], type=pa.float32()), + "phase_int": pa.array(y, type=pa.int8()), + "predicted_int": pa.array(y_pred, type=pa.int8()), + "cluster": pa.array(cluster_ids, type=pa.int32()), + "is_train": pa.array(train_mask, type=pa.bool_()), + }) + args.fit_out.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(out, args.fit_out, compression="zstd") + log.info("wrote %s", args.fit_out) + + if args.no_publish: + return 0 + + # Publish — rate-limited so we don't drown the dashboard + publisher = (null_publisher() if args.dry_run + else http_publisher(args.publish_url)) + + # Subsample the publish stream so the scatter doesn't get + # millions of points. Keep all training-set diversity by stratifying. + rng = np.random.default_rng(args.seed) + if args.max_publish > 0 and len(eids) > args.max_publish: + sel = rng.choice(len(eids), size=args.max_publish, replace=False) + else: + sel = np.arange(len(eids)) + + n_emit = 0 + started = time.monotonic() + for i in sel: + msg = { + "type": "embedding", + "x": float(XYZ[i, 0]), + "y": float(XYZ[i, 1]), + "z": float(XYZ[i, 2]), + "phase": _safe_phase(y[i]), + "predicted": _safe_phase(y_pred[i]), + "cluster": int(cluster_ids[i]), + } + await publisher(msg) + n_emit += 1 + if args.rate_hz > 0: + await asyncio.sleep(1.0 / args.rate_hz) + + elapsed = time.monotonic() - started + log.info("published %d embedding events in %.1fs", n_emit, elapsed) + return 0 + + +async def _metric(args) -> int: + """One-shot publish of ModelMetric{model: 'knn', accuracy: macro_f1}.""" + from training.eval_._metrics import _macro_f1 + + X, y, eids, hosts, profs = _load_features(args.window, args.schema) + Xz = _standardize(X) + val = pq.read_table(args.validation).to_pylist() + rows = [r for r in val if r["status"] in ("accepted", "degraded")] + s = held_out_host( + profiles=[r["profile"] for r in rows], + sample_names=[r["sample_name"] for r in rows], + host_ids=[r["host_id"] for r in rows], + episode_ids=[r["episode_id"] for r in rows], + train_hosts=args.train_hosts, + seed=args.seed, + ) + train_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.train[i]} + test_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.test[i]} + train_mask = np.array([e in train_eps for e in eids], dtype=bool) + test_mask = np.array([e in test_eps for e in eids], dtype=bool) + log.info("KNN metric: train=%d test=%d", + int(train_mask.sum()), int(test_mask.sum())) + y_pred = _knn_predict( + X_train=Xz[train_mask], y_train=y[train_mask], + X_eval=Xz[test_mask], k=args.k_neighbors, + ) + f1 = _macro_f1(y[test_mask], y_pred, n_classes=max(int(y.max()) + 1, 5)) + log.info("KNN test macro_f1 = %.4f", f1) + + publisher = (null_publisher() if args.dry_run + else http_publisher(args.publish_url)) + # Re-publish on a tick if interval > 0 (dashboard reconnect-warmth) + while True: + await publisher({"type": "model_metric", "model": "knn", + "accuracy": float(f1)}) + if args.interval <= 0: + return 0 + await asyncio.sleep(args.interval) + + +# ───────────────────────────────────────────────────────────────────── +# CLI +# ───────────────────────────────────────────────────────────────────── + + +def main() -> int: + p = argparse.ArgumentParser() + sub = p.add_subparsers(dest="cmd", required=True) + + common = argparse.ArgumentParser(add_help=False) + common.add_argument("--window", required=True, type=Path, + help="features_window_v1.parquet") + common.add_argument("--schema", required=True, type=Path, + help="feature_schema_v1.json") + common.add_argument("--validation", type=Path, + default=Path("data/processed/validation_v1.parquet")) + common.add_argument("--split-recipe", choices=["host", "random"], + default="host") + common.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"]) + common.add_argument("--seed", type=int, default=0) + common.add_argument("--k-neighbors", type=int, default=10) + common.add_argument("--publish-url", default="http://127.0.0.1:8447/publish") + common.add_argument("--dry-run", action="store_true") + common.add_argument("--log-level", default="INFO") + + pp = sub.add_parser("produce", parents=[common], + help="emit Embedding events for scene-11 scatter") + pp.add_argument("--k-clusters", type=int, default=8) + pp.add_argument("--max-rows", type=int, default=None, + help="cap windows loaded (testing)") + pp.add_argument("--max-publish", type=int, default=2000, + help="cap published events (the scatter struggles " + "above ~5k points)") + pp.add_argument("--rate-hz", type=float, default=200.0, + help="max publish rate (events/sec); 0 = unlimited") + pp.add_argument("--no-publish", action="store_true", + help="fit + dump but skip the publish step") + pp.add_argument("--fit-out", type=Path, default=None, + help="optional parquet for the per-window fit") + pp.set_defaults(func=_produce) + + pm = sub.add_parser("metric", parents=[common], + help="publish ModelMetric{knn} on a tick") + pm.add_argument("--interval", type=float, default=20.0, + help="re-publish period (s); 0 = one-shot") + pm.set_defaults(func=_metric) + + args = p.parse_args() + logging.basicConfig(level=args.log_level, + format="%(asctime)s %(levelname)s %(name)s %(message)s") + return asyncio.run(args.func(args)) + + +if __name__ == "__main__": + raise SystemExit(main())