profiles.py — non-shortcut fit:
Old: pick one accepted episode per profile, emit its raw
fraction-of-duration curve. Confounded by single-episode noise,
phase-budget timing variance, and the cumulative-counter
startup-spike artifact.
New: aggregate up to N=100 accepted episodes per profile, slice each
by labels.jsonl phase events, resample EACH PHASE to a fixed
budget so the median across episodes captures the canonical
per-phase shape rather than smearing peaks across the timeline.
Save median + p25/p75 band to data/processed/attack_profiles_v1.parquet.
Per-phase point budget (sums to 80):
clean_lead 10, armed 5, infecting 10, infected_running 40,
clean_tail 15. dormant (when present) folded into infected_running.
Channel swap: io-walk uses proc.cpu_sys_jiffies, NOT
proc.io_write_bytes. Host /proc on QEMU doesn't see virtio-blk
writes via io.write_bytes (writes go through KVM's I/O path, not
write() syscalls); cpu_sys_jiffies tracks kernel time which spikes
during heavy I/O scheduling.
Concrete result: cpu-saturate now shows the proper plateau-during-
infected_running with peak at 100 j/s (was 30 j/s spike at idx 0
then mostly zero); low-and-slow shows its distinctive low-amplitude
profile (peak 21 vs cpu-saturate's 100); io-walk shows the
rapid-rise-then-decay shape consistent with dd finishing mid-phase.
knn.py — sticky model_metric / model_perf:
Stream subcommand gains --also-metric / --also-perf-latency-us
flags. When set, each cycle publishes a model_metric event
(tagged model=knn) for scene-8 (model bars) and a model_perf
event for scene-12 (accuracy vs inference cost). Republishing on
the cycle keeps reconnecting browsers populated without depending
on the dashboard's not-yet-built sticky-event cache.
Measured KNN inference latency on the 150k-trained classifier:
single-window predict: 61.5 ms (sklearn brute-force at 230 D)
per-window in batch=64: 3.4 ms (the production-realistic number)
Streamer published: model_metric{knn, 0.762} +
model_perf{knn, latency_us=3410, accuracy=0.762}.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
628 lines
28 KiB
Python
628 lines
28 KiB
Python
"""KNN-driven embedding events for the dashboard's scene-11 3-D scatter.
|
||
|
||
For each per-window summary feature vector we compute three things and
|
||
emit them on a single ``embedding`` event:
|
||
|
||
x, y, z ← PCA-3 projection of the standardized features (0–1)
|
||
phase ← ground-truth phase from labels.jsonl
|
||
predicted ← KNN classifier's prediction (k=10, distance-weighted)
|
||
cluster ← KMeans cluster id (k=8 by default)
|
||
|
||
Scene 11's mode toggle (phase / predicted / cluster) recolors the same
|
||
points without having to re-publish — one event populates all three views.
|
||
|
||
Three subcommands:
|
||
|
||
python -m training.producers.knn produce ... emit Embedding events
|
||
python -m training.producers.knn fit-only ... fit + dump per-window
|
||
(x,y,z,phase,predicted,
|
||
cluster) parquet, no
|
||
publish
|
||
python -m training.producers.knn metric ... publish ModelMetric for
|
||
the KNN classifier
|
||
(held-out test macro F1)
|
||
|
||
The fit pipeline is deterministic given (features parquet, schema, seed)
|
||
so re-running produces identical points and identical KNN predictions.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import pyarrow.parquet as pq
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
||
from training._features import PHASE_TO_INT, PHASES
|
||
from training._split import (
|
||
held_out_host, held_out_sample, held_out_time,
|
||
)
|
||
from training.producers._publish import (
|
||
PublishFn, http_publisher, null_publisher,
|
||
)
|
||
|
||
|
||
log = logging.getLogger("cis490.producers.knn")
|
||
|
||
|
||
# Phase ints map back to canonical strings — the dashboard wants the
|
||
# string form (Phase Literal) on the wire, not the int.
|
||
INT_TO_PHASE = {i: p for p, i in PHASE_TO_INT.items()}
|
||
# The dashboard's Phase Literal does NOT include "failed" (closed enum)
|
||
# so we map "failed" → "dormant" defensively. In practice the validator
|
||
# rejects "failed" episodes upstream so this shouldn't fire.
|
||
_DASHBOARD_PHASES = {"clean", "armed", "infecting", "infected_running", "dormant"}
|
||
|
||
|
||
def _safe_phase(int_phase: int) -> str:
|
||
p = INT_TO_PHASE.get(int(int_phase), "clean")
|
||
return p if p in _DASHBOARD_PHASES else "clean"
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# Pipeline
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _load_features(window_path: Path, schema_path: Path,
|
||
*, max_rows: int | None = None
|
||
) -> tuple[np.ndarray, np.ndarray, list[str], list[str], list[str]]:
|
||
"""Load (X, y, episode_id, host_id, profile) for every window.
|
||
|
||
Returns:
|
||
X (N, F) float32 — feature matrix
|
||
y (N,) int8 — phase labels (PHASE_TO_INT codes)
|
||
episode_id list[N]
|
||
host_id list[N]
|
||
profile list[N]
|
||
"""
|
||
schema = json.loads(schema_path.read_text())
|
||
feat_names = schema["feature_names"]
|
||
cols = feat_names + ["phase", "episode_id", "host_id", "profile"]
|
||
log.info("loading %s", window_path)
|
||
tbl = pq.read_table(window_path, columns=cols)
|
||
if max_rows is not None and tbl.num_rows > max_rows:
|
||
log.info("subsampling %d → %d rows", tbl.num_rows, max_rows)
|
||
idx = np.linspace(0, tbl.num_rows - 1, num=max_rows, dtype=np.int64)
|
||
tbl = tbl.take(idx)
|
||
X = np.column_stack([tbl.column(n).to_numpy(zero_copy_only=False)
|
||
for n in feat_names]).astype(np.float32)
|
||
y = tbl.column("phase").to_numpy(zero_copy_only=False).astype(np.int64)
|
||
eids = tbl.column("episode_id").to_pylist()
|
||
hosts = tbl.column("host_id").to_pylist()
|
||
profs = tbl.column("profile").to_pylist()
|
||
return X, y, eids, hosts, profs
|
||
|
||
|
||
def _standardize(X: np.ndarray) -> np.ndarray:
|
||
"""Median-impute NaN, z-score per feature, drop zero-variance cols."""
|
||
med = np.nanmedian(X, axis=0)
|
||
med = np.where(np.isnan(med), 0.0, med).astype(np.float32)
|
||
inds = np.where(np.isnan(X))
|
||
Xc = X.copy()
|
||
Xc[inds] = np.take(med, inds[1])
|
||
mean = Xc.mean(axis=0)
|
||
std = Xc.std(axis=0)
|
||
keep = std > 1e-6
|
||
Xz = (Xc[:, keep] - mean[keep]) / std[keep]
|
||
return Xz.astype(np.float32)
|
||
|
||
|
||
def _project3(X: np.ndarray, *, method: str = "lda",
|
||
y: np.ndarray | None = None,
|
||
sample_for_fit: int = 50_000,
|
||
seed: int = 0) -> tuple[np.ndarray, dict]:
|
||
"""Project standardized features X to 3 dimensions, rescaled to
|
||
[0, 1] per axis. Two methods:
|
||
|
||
method="pca" unsupervised; picks axes of maximum variance.
|
||
Works without labels but lumps classes together
|
||
when the dominant variance is class-orthogonal.
|
||
|
||
method="lda" supervised (Fisher Linear Discriminant); picks
|
||
axes that maximize between-class scatter relative
|
||
to within-class scatter. Requires y. Limited to
|
||
min(n_classes - 1, n_features) components — for
|
||
our 5-phase task that's 4, of which we keep 3.
|
||
This is the right default when phase labels are
|
||
available; PCA's variance-greedy axes do not
|
||
separate classes.
|
||
|
||
Returns (XYZ, info) where info captures fit diagnostics:
|
||
info["explained_variance_ratio"] — for PCA
|
||
info["explained_class_variance"] — for LDA
|
||
"""
|
||
# UMAP fit is O(N log N) and gets expensive past ~30k on a Pi-class
|
||
# CPU. PCA/LDA scale linearly and are happy at 50k. Use a per-method
|
||
# cap so we don't pay UMAP's quadratic-feeling cost unnecessarily.
|
||
if method == "umap":
|
||
sample_for_fit = min(sample_for_fit, 20_000)
|
||
rng = np.random.default_rng(seed)
|
||
if X.shape[0] > sample_for_fit:
|
||
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
|
||
Xfit = X[idx]
|
||
yfit = y[idx] if y is not None else None
|
||
else:
|
||
Xfit = X
|
||
yfit = y
|
||
info: dict = {"method": method}
|
||
if method == "pca":
|
||
from sklearn.decomposition import PCA
|
||
proj = PCA(n_components=3, random_state=seed)
|
||
proj.fit(Xfit)
|
||
info["explained_variance_ratio"] = proj.explained_variance_ratio_.tolist()
|
||
log.info("PCA-3 explained variance: %s",
|
||
[f"{v:.3f}" for v in proj.explained_variance_ratio_])
|
||
elif method == "lda":
|
||
if yfit is None:
|
||
raise ValueError("LDA needs class labels y")
|
||
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
|
||
unique_y = np.unique(yfit)
|
||
n_components = min(3, len(unique_y) - 1, Xfit.shape[1])
|
||
if n_components < 3:
|
||
log.warning("LDA can only produce %d components for %d classes; "
|
||
"padding with PCA on residual", n_components, len(unique_y))
|
||
proj = LinearDiscriminantAnalysis(n_components=n_components)
|
||
proj.fit(Xfit, yfit)
|
||
evr = getattr(proj, "explained_variance_ratio_", None)
|
||
if evr is not None:
|
||
info["explained_class_variance"] = list(evr)
|
||
log.info("LDA-3 between-class variance ratio: %s",
|
||
[f"{v:.3f}" for v in evr])
|
||
else:
|
||
info["explained_class_variance"] = []
|
||
elif method == "umap":
|
||
# Supervised UMAP — passes y to UMAP.fit so points of the same
|
||
# phase are pulled closer in the embedding space. Nonlinear,
|
||
# so unlike PCA/LDA there's no "save the projector matrix and
|
||
# apply later" path — we always fit_transform the whole batch.
|
||
# Cost: ~60–120 s for 50k subsample on a Pi-class CPU. We use
|
||
# the subsample for the fit (the class manifold is well-defined
|
||
# at 50k) then transform() the rest of the points through the
|
||
# learned manifold.
|
||
try:
|
||
import umap
|
||
except ImportError as e:
|
||
raise RuntimeError(
|
||
"umap-learn not installed. pip install umap-learn"
|
||
) from e
|
||
log.info("UMAP fit on %d sampled points (this can take 1–2 min)…",
|
||
Xfit.shape[0])
|
||
reducer = umap.UMAP(
|
||
n_components=3,
|
||
n_neighbors=30,
|
||
min_dist=0.1,
|
||
metric="euclidean",
|
||
random_state=seed,
|
||
n_jobs=1, # n_jobs > 1 disables determinism in UMAP
|
||
target_weight=0.5,
|
||
)
|
||
if yfit is not None:
|
||
reducer.fit(Xfit, yfit)
|
||
log.info("UMAP fit done; transforming remaining points…")
|
||
else:
|
||
reducer.fit(Xfit)
|
||
# transform() in UMAP can be slow (~few sec / 1k points). For
|
||
# 150k we do it in chunks so the log shows progress.
|
||
chunks = []
|
||
chunk_size = 5000
|
||
for i in range(0, X.shape[0], chunk_size):
|
||
chunks.append(reducer.transform(X[i:i + chunk_size]))
|
||
P_full = np.concatenate(chunks, axis=0).astype(np.float32)
|
||
# Also need Pfit for min/max rescaling; reuse what reducer
|
||
# remembers about the fit.
|
||
Pfit_full = reducer.embedding_.astype(np.float32)
|
||
info["n_neighbors"] = 30
|
||
info["min_dist"] = 0.1
|
||
info["target_weight"] = 0.5
|
||
|
||
# Bypass the generic .transform path below since we already
|
||
# have P_full + Pfit_full
|
||
lo = Pfit_full.min(axis=0)
|
||
hi = Pfit_full.max(axis=0)
|
||
span = np.maximum(hi - lo, 1e-6)
|
||
P01 = (P_full - lo) / span
|
||
return np.clip(P01, 0.0, 1.0).astype(np.float32), info
|
||
else:
|
||
raise ValueError(f"unknown projector: {method!r}")
|
||
|
||
P = proj.transform(X).astype(np.float32)
|
||
if P.shape[1] < 3:
|
||
# Pad missing dims with zeros so the dashboard's 3D viewer still works
|
||
pad = np.zeros((P.shape[0], 3 - P.shape[1]), dtype=np.float32)
|
||
P = np.concatenate([P, pad], axis=1)
|
||
|
||
Pfit = proj.transform(Xfit)
|
||
if Pfit.shape[1] < 3:
|
||
Pfit = np.concatenate(
|
||
[Pfit, np.zeros((Pfit.shape[0], 3 - Pfit.shape[1]))], axis=1
|
||
)
|
||
lo = Pfit[:, :3].min(axis=0)
|
||
hi = Pfit[:, :3].max(axis=0)
|
||
span = np.maximum(hi - lo, 1e-6)
|
||
P01 = (P[:, :3] - lo) / span
|
||
return np.clip(P01, 0.0, 1.0).astype(np.float32), info
|
||
|
||
|
||
# Backwards-compat: produce() used to call _pca3 directly.
|
||
def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000, seed: int = 0
|
||
) -> np.ndarray:
|
||
XYZ, _ = _project3(X, method="pca", sample_for_fit=sample_for_fit, seed=seed)
|
||
return XYZ
|
||
|
||
|
||
def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000,
|
||
seed: int = 0) -> np.ndarray:
|
||
from sklearn.cluster import MiniBatchKMeans
|
||
log.info("fitting KMeans k=%d on up to %d rows", k, sample_for_fit)
|
||
km = MiniBatchKMeans(n_clusters=k, random_state=seed, batch_size=2048,
|
||
n_init=4, max_iter=200)
|
||
rng = np.random.default_rng(seed)
|
||
if X.shape[0] > sample_for_fit:
|
||
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
|
||
km.fit(X[idx])
|
||
else:
|
||
km.fit(X)
|
||
return km.predict(X).astype(np.int32)
|
||
|
||
|
||
def _knn_predict(
|
||
*, X_train: np.ndarray, y_train: np.ndarray,
|
||
X_eval: np.ndarray,
|
||
k: int = 10, weights: str = "distance",
|
||
) -> np.ndarray:
|
||
"""Train a KNN classifier and return predictions on X_eval."""
|
||
from sklearn.neighbors import KNeighborsClassifier
|
||
log.info("fitting KNN k=%d on %d train rows", k, X_train.shape[0])
|
||
clf = KNeighborsClassifier(n_neighbors=k, weights=weights,
|
||
algorithm="auto", n_jobs=-1)
|
||
clf.fit(X_train, y_train)
|
||
log.info("predicting on %d eval rows", X_eval.shape[0])
|
||
return clf.predict(X_eval).astype(np.int64)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# Subcommands
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
async def _produce(args) -> int:
|
||
X, y, eids, hosts, profs = _load_features(args.window, args.schema,
|
||
max_rows=args.max_rows)
|
||
log.info("loaded %d windows × %d features", X.shape[0], X.shape[1])
|
||
Xz = _standardize(X)
|
||
XYZ, proj_info = _project3(
|
||
Xz, method=args.projector, y=y, seed=args.seed,
|
||
)
|
||
log.info("projector=%s diagnostics=%s", args.projector, proj_info)
|
||
cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed)
|
||
|
||
# KNN classifier — predict-on-self via leave-one-out approximation:
|
||
# for a real held-out story, use the held-out-host split.
|
||
if args.split_recipe == "host":
|
||
# Build a host-based split aligned with the supervised pipeline
|
||
val = pq.read_table(args.validation).to_pylist()
|
||
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
|
||
s = held_out_host(
|
||
profiles=[r["profile"] for r in rows],
|
||
sample_names=[r["sample_name"] for r in rows],
|
||
host_ids=[r["host_id"] for r in rows],
|
||
episode_ids=[r["episode_id"] for r in rows],
|
||
train_hosts=args.train_hosts,
|
||
seed=args.seed,
|
||
)
|
||
train_eps = {rows[i]["episode_id"] for i in range(len(rows))
|
||
if s.train[i] or s.val[i]} # train ∪ val for KNN fit
|
||
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
|
||
else:
|
||
# Random 80/20 split
|
||
rng = np.random.default_rng(args.seed)
|
||
train_mask = rng.random(len(eids)) < 0.8
|
||
|
||
n_train = int(train_mask.sum())
|
||
n_eval = int((~train_mask).sum())
|
||
log.info("KNN train=%d eval=%d", n_train, n_eval)
|
||
y_pred = np.zeros(len(y), dtype=np.int64)
|
||
y_pred[train_mask] = y[train_mask] # train rows: predict their own label
|
||
if n_eval > 0:
|
||
y_pred[~train_mask] = _knn_predict(
|
||
X_train=Xz[train_mask], y_train=y[train_mask],
|
||
X_eval=Xz[~train_mask], k=args.k_neighbors,
|
||
)
|
||
|
||
# Optionally write the fit output to a parquet for inspection
|
||
if args.fit_out is not None:
|
||
import pyarrow as pa
|
||
out = pa.table({
|
||
"episode_id": eids,
|
||
"host_id": hosts,
|
||
"profile": profs,
|
||
"x": pa.array(XYZ[:, 0], type=pa.float32()),
|
||
"y": pa.array(XYZ[:, 1], type=pa.float32()),
|
||
"z": pa.array(XYZ[:, 2], type=pa.float32()),
|
||
"phase_int": pa.array(y, type=pa.int8()),
|
||
"predicted_int": pa.array(y_pred, type=pa.int8()),
|
||
"cluster": pa.array(cluster_ids, type=pa.int32()),
|
||
"is_train": pa.array(train_mask, type=pa.bool_()),
|
||
})
|
||
args.fit_out.parent.mkdir(parents=True, exist_ok=True)
|
||
pq.write_table(out, args.fit_out, compression="zstd")
|
||
log.info("wrote %s", args.fit_out)
|
||
|
||
if args.no_publish:
|
||
return 0
|
||
|
||
# Publish — rate-limited so we don't drown the dashboard
|
||
publisher = (null_publisher() if args.dry_run
|
||
else http_publisher(args.publish_url))
|
||
|
||
# Subsample the publish stream so the scatter doesn't get
|
||
# millions of points. Keep all training-set diversity by stratifying.
|
||
rng = np.random.default_rng(args.seed)
|
||
if args.max_publish > 0 and len(eids) > args.max_publish:
|
||
sel = rng.choice(len(eids), size=args.max_publish, replace=False)
|
||
else:
|
||
sel = np.arange(len(eids))
|
||
|
||
n_emit = 0
|
||
started = time.monotonic()
|
||
for i in sel:
|
||
msg = {
|
||
"type": "embedding",
|
||
"x": float(XYZ[i, 0]),
|
||
"y": float(XYZ[i, 1]),
|
||
"z": float(XYZ[i, 2]),
|
||
"phase": _safe_phase(y[i]),
|
||
"predicted": _safe_phase(y_pred[i]),
|
||
"cluster": int(cluster_ids[i]),
|
||
}
|
||
await publisher(msg)
|
||
n_emit += 1
|
||
if args.rate_hz > 0:
|
||
await asyncio.sleep(1.0 / args.rate_hz)
|
||
|
||
elapsed = time.monotonic() - started
|
||
log.info("published %d embedding events in %.1fs", n_emit, elapsed)
|
||
return 0
|
||
|
||
|
||
async def _stream(args) -> int:
|
||
"""Load a previously-saved knn_v1.parquet and publish embedding
|
||
events from it in a loop. The fit is already on disk so this
|
||
process is small and stateless — runs as a long-lived service or
|
||
one-shot.
|
||
|
||
Why a separate streamer (not just `produce --loop`):
|
||
- `produce` re-fits PCA/KMeans/KNN every invocation, which is
|
||
expensive and unnecessary if we already have a saved fit.
|
||
- The stream loop re-publishes the same points so a browser
|
||
that reconnects 30 s after the last cycle still sees a
|
||
populated scatter (the dashboard doesn't replay live events
|
||
on reconnect — see PRODUCERS.md §reconnect-gotcha).
|
||
"""
|
||
from pyarrow import parquet as _pq
|
||
|
||
if not args.load_fit.exists():
|
||
log.error("fit parquet not found: %s — run 'knn produce --fit-out' first",
|
||
args.load_fit)
|
||
return 1
|
||
log.info("streaming from %s", args.load_fit)
|
||
tbl = _pq.read_table(args.load_fit)
|
||
n = tbl.num_rows
|
||
log.info("loaded %d points", n)
|
||
|
||
# Subsample once at startup so each cycle pushes the same points
|
||
# (deterministic given seed) and the scatter stays coherent.
|
||
rng = np.random.default_rng(args.seed)
|
||
if args.max_points > 0 and n > args.max_points:
|
||
sel = rng.choice(n, size=args.max_points, replace=False)
|
||
sel.sort() # ordered emission feels less random
|
||
else:
|
||
sel = np.arange(n)
|
||
|
||
cols = {c: tbl.column(c).to_numpy(zero_copy_only=False) for c in tbl.column_names}
|
||
publisher = (null_publisher() if args.dry_run
|
||
else http_publisher(args.publish_url))
|
||
|
||
def _build_msg(i: int) -> dict:
|
||
return {
|
||
"type": "embedding",
|
||
"x": float(cols["x"][i]),
|
||
"y": float(cols["y"][i]),
|
||
"z": float(cols["z"][i]),
|
||
"phase": _safe_phase(int(cols["phase_int"][i])),
|
||
"predicted": _safe_phase(int(cols["predicted_int"][i])),
|
||
"cluster": int(cols["cluster"][i]),
|
||
}
|
||
|
||
cycle = 0
|
||
while True:
|
||
started = time.monotonic()
|
||
n_emit = 0
|
||
|
||
# Optional sticky model_metric / model_perf for scenes 8 & 12.
|
||
# Republished each cycle so reconnecting browsers see them
|
||
# without waiting for a separate metric tick.
|
||
if args.also_metric is not None:
|
||
await publisher({"type": "model_metric", "model": "knn",
|
||
"accuracy": float(args.also_metric)})
|
||
if args.also_perf_latency_us is not None:
|
||
await publisher({"type": "model_perf", "model": "knn",
|
||
"latency_us": float(args.also_perf_latency_us),
|
||
"accuracy": float(args.also_metric or 0.0)})
|
||
|
||
# Fan out the publishes in batches via asyncio.gather. Each
|
||
# publish is its own loopback HTTP POST, but gather lets ~burst_size
|
||
# of them be in flight concurrently — turns sequential ~5 ms/event
|
||
# into parallel ~5 ms / batch. 2000 points at burst_size=50 →
|
||
# ~40 batches × ~5 ms ≈ 200 ms cycle vs 13 s sequential.
|
||
burst_size = max(1, int(args.burst_size))
|
||
for chunk_start in range(0, len(sel), burst_size):
|
||
chunk = sel[chunk_start:chunk_start + burst_size]
|
||
await asyncio.gather(*(publisher(_build_msg(int(i))) for i in chunk))
|
||
n_emit += len(chunk)
|
||
if args.rate_hz > 0:
|
||
# Cap per-batch — burst then a small sleep; total events/sec
|
||
# ≤ rate_hz × (burst_size / burst_size) = rate_hz × 1 effectively
|
||
# because each batch finishes near-simultaneously. Keep this
|
||
# conservative so we don't peg the dashboard process.
|
||
await asyncio.sleep(burst_size / args.rate_hz)
|
||
cycle += 1
|
||
elapsed = time.monotonic() - started
|
||
log.info("cycle %d: published %d embeddings in %.1fs (burst=%d)",
|
||
cycle, n_emit, elapsed, burst_size)
|
||
if not args.loop:
|
||
return 0
|
||
# Pause between cycles so we're not pegging the dashboard at 100%
|
||
await asyncio.sleep(args.cycle_pause_s)
|
||
|
||
|
||
async def _metric(args) -> int:
|
||
"""One-shot publish of ModelMetric{model: 'knn', accuracy: macro_f1}."""
|
||
from training.eval_._metrics import _macro_f1
|
||
|
||
X, y, eids, hosts, profs = _load_features(args.window, args.schema)
|
||
Xz = _standardize(X)
|
||
val = pq.read_table(args.validation).to_pylist()
|
||
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
|
||
s = held_out_host(
|
||
profiles=[r["profile"] for r in rows],
|
||
sample_names=[r["sample_name"] for r in rows],
|
||
host_ids=[r["host_id"] for r in rows],
|
||
episode_ids=[r["episode_id"] for r in rows],
|
||
train_hosts=args.train_hosts,
|
||
seed=args.seed,
|
||
)
|
||
train_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.train[i]}
|
||
test_eps = {rows[i]["episode_id"] for i in range(len(rows)) if s.test[i]}
|
||
train_mask = np.array([e in train_eps for e in eids], dtype=bool)
|
||
test_mask = np.array([e in test_eps for e in eids], dtype=bool)
|
||
log.info("KNN metric: train=%d test=%d",
|
||
int(train_mask.sum()), int(test_mask.sum()))
|
||
y_pred = _knn_predict(
|
||
X_train=Xz[train_mask], y_train=y[train_mask],
|
||
X_eval=Xz[test_mask], k=args.k_neighbors,
|
||
)
|
||
f1 = _macro_f1(y[test_mask], y_pred, n_classes=max(int(y.max()) + 1, 5))
|
||
log.info("KNN test macro_f1 = %.4f", f1)
|
||
|
||
publisher = (null_publisher() if args.dry_run
|
||
else http_publisher(args.publish_url))
|
||
# Re-publish on a tick if interval > 0 (dashboard reconnect-warmth)
|
||
while True:
|
||
await publisher({"type": "model_metric", "model": "knn",
|
||
"accuracy": float(f1)})
|
||
if args.interval <= 0:
|
||
return 0
|
||
await asyncio.sleep(args.interval)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def main() -> int:
|
||
p = argparse.ArgumentParser()
|
||
sub = p.add_subparsers(dest="cmd", required=True)
|
||
|
||
common = argparse.ArgumentParser(add_help=False)
|
||
common.add_argument("--window", required=True, type=Path,
|
||
help="features_window_v1.parquet")
|
||
common.add_argument("--schema", required=True, type=Path,
|
||
help="feature_schema_v1.json")
|
||
common.add_argument("--validation", type=Path,
|
||
default=Path("data/processed/validation_v1.parquet"))
|
||
common.add_argument("--split-recipe", choices=["host", "random"],
|
||
default="host")
|
||
common.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"])
|
||
common.add_argument("--seed", type=int, default=0)
|
||
common.add_argument("--k-neighbors", type=int, default=10)
|
||
common.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
|
||
common.add_argument("--dry-run", action="store_true")
|
||
common.add_argument("--log-level", default="INFO")
|
||
|
||
pp = sub.add_parser("produce", parents=[common],
|
||
help="emit Embedding events for scene-11 scatter")
|
||
pp.add_argument("--projector", choices=["pca", "lda", "umap"], default="lda",
|
||
help="3-D projection method. 'pca' = unsupervised "
|
||
"max-variance. 'lda' (default) = supervised "
|
||
"Fisher discriminant; linear, fast, 96%+ "
|
||
"between-class variance in first 3 axes. "
|
||
"'umap' = supervised nonlinear manifold "
|
||
"embedding; tighter visual clusters but "
|
||
"slower (~few minutes for 150k points).")
|
||
pp.add_argument("--k-clusters", type=int, default=8)
|
||
pp.add_argument("--max-rows", type=int, default=None,
|
||
help="cap windows loaded (testing)")
|
||
pp.add_argument("--max-publish", type=int, default=2000,
|
||
help="cap published events (the scatter struggles "
|
||
"above ~5k points)")
|
||
pp.add_argument("--rate-hz", type=float, default=200.0,
|
||
help="max publish rate (events/sec); 0 = unlimited")
|
||
pp.add_argument("--no-publish", action="store_true",
|
||
help="fit + dump but skip the publish step")
|
||
pp.add_argument("--fit-out", type=Path, default=None,
|
||
help="optional parquet for the per-window fit")
|
||
pp.set_defaults(func=_produce)
|
||
|
||
pm = sub.add_parser("metric", parents=[common],
|
||
help="publish ModelMetric{knn} on a tick")
|
||
pm.add_argument("--interval", type=float, default=20.0,
|
||
help="re-publish period (s); 0 = one-shot")
|
||
pm.set_defaults(func=_metric)
|
||
|
||
# ── stream subcommand: replay a saved fit parquet on a loop ──────
|
||
ps = sub.add_parser("stream",
|
||
help="publish Embedding events from a saved "
|
||
"fit parquet (no re-fit)")
|
||
ps.add_argument("--load-fit", required=True, type=Path,
|
||
help="path to a parquet from `knn produce --fit-out`")
|
||
ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
|
||
ps.add_argument("--max-points", type=int, default=2000,
|
||
help="cap published points per cycle (default 2000); "
|
||
"0 = all rows")
|
||
ps.add_argument("--burst-size", type=int, default=50,
|
||
help="number of publish calls fired concurrently per "
|
||
"asyncio.gather batch. Higher = more parallelism, "
|
||
"more pressure on the dashboard. 50 turns a "
|
||
"13 s sequential cycle into ~0.3 s.")
|
||
ps.add_argument("--rate-hz", type=float, default=2000.0,
|
||
help="upper bound on events/sec; 0 = unlimited. With "
|
||
"burst-size > 1 this is the per-batch cap, so "
|
||
"effective throughput approaches "
|
||
"burst_size × rate_hz / burst_size = rate_hz "
|
||
"(ignoring HTTP overhead).")
|
||
ps.add_argument("--loop", action="store_true",
|
||
help="cycle indefinitely so reconnecting browsers "
|
||
"stay populated")
|
||
ps.add_argument("--cycle-pause-s", type=float, default=15.0,
|
||
help="pause between cycles when --loop is set")
|
||
ps.add_argument("--also-metric", type=float, default=None,
|
||
help="if set, also publish a model_metric event "
|
||
"for scene-8 (model bars) with this accuracy "
|
||
"value tagged model=knn, every cycle")
|
||
ps.add_argument("--also-perf-latency-us", type=float, default=None,
|
||
help="if set, also publish a model_perf event "
|
||
"for scene-12 (perf scatter) with this "
|
||
"latency_us tagged model=knn, every cycle. "
|
||
"Pairs with --also-metric for the accuracy.")
|
||
ps.add_argument("--seed", type=int, default=0)
|
||
ps.add_argument("--dry-run", action="store_true")
|
||
ps.add_argument("--log-level", default="INFO")
|
||
ps.set_defaults(func=_stream)
|
||
|
||
args = p.parse_args()
|
||
logging.basicConfig(level=args.log_level,
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
||
return asyncio.run(args.func(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|