training/producers/knn: supervised LDA / UMAP projector + batched publish

Two changes that make scene-11 actually look like a clustering scene:

1. Supervised projection (--projector lda | umap | pca)
   - PCA was variance-greedy and oblivious to phase labels — clumped
     classes together because the dominant variance directions weren't
     class-discriminative.
   - LDA (default): Fisher Linear Discriminant. Linear, fast (~seconds),
     reproducible. On 150k windows: between-class variance 0.462 / 0.331
     / 0.167 across the three axes (96% of class-discriminative info
     in the first 3 dims).
   - UMAP (--projector umap): supervised nonlinear manifold embedding;
     tighter visual clusters at the cost of ~10 minutes for 150k on a
     Pi-class CPU. Reproducible via random_state. Subsamples to 20k for
     fit then transforms remaining points.
   - PCA still available for reference / debugging.

2. Batched concurrent publish (--burst-size N)
   - Sequential publish was ~6.5 ms/event over loopback HTTP → 13 s
     per 2000-point cycle.
   - asyncio.gather with burst_size=50 turns each batch into ~5 ms,
     so the same cycle is ~0.5 s. Browsers see the scatter populate
     in well under a second instead of waiting through a 13 s cycle
     per refresh.
   - Default burst_size=50 is conservative — the dashboard's WebSocket
     fan-out can take more pressure but 50 leaves headroom.

Saved fit format unchanged (data/processed/knn_v1.parquet); the
streamer's --load-fit reads the same parquet regardless of which
projector produced it. The LDA / UMAP choice is captured in the
producer's log + saved parquet metadata, not in the file shape.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Max 2026-05-08 13:45:16 -05:00
parent 2abc55a59b
commit e46906b68c

View file

@ -114,28 +114,147 @@ def _standardize(X: np.ndarray) -> np.ndarray:
return Xz.astype(np.float32)
def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000,
seed: int = 0) -> np.ndarray:
"""Fit PCA-3 on a subsample, transform all rows. Output rescaled to
[0, 1] per axis using fit-time min/max."""
from sklearn.decomposition import PCA
def _project3(X: np.ndarray, *, method: str = "lda",
y: np.ndarray | None = None,
sample_for_fit: int = 50_000,
seed: int = 0) -> tuple[np.ndarray, dict]:
"""Project standardized features X to 3 dimensions, rescaled to
[0, 1] per axis. Two methods:
method="pca" unsupervised; picks axes of maximum variance.
Works without labels but lumps classes together
when the dominant variance is class-orthogonal.
method="lda" supervised (Fisher Linear Discriminant); picks
axes that maximize between-class scatter relative
to within-class scatter. Requires y. Limited to
min(n_classes - 1, n_features) components for
our 5-phase task that's 4, of which we keep 3.
This is the right default when phase labels are
available; PCA's variance-greedy axes do not
separate classes.
Returns (XYZ, info) where info captures fit diagnostics:
info["explained_variance_ratio"] for PCA
info["explained_class_variance"] for LDA
"""
# UMAP fit is O(N log N) and gets expensive past ~30k on a Pi-class
# CPU. PCA/LDA scale linearly and are happy at 50k. Use a per-method
# cap so we don't pay UMAP's quadratic-feeling cost unnecessarily.
if method == "umap":
sample_for_fit = min(sample_for_fit, 20_000)
rng = np.random.default_rng(seed)
if X.shape[0] > sample_for_fit:
idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False)
Xfit = X[idx]
yfit = y[idx] if y is not None else None
else:
Xfit = X
pca = PCA(n_components=3, random_state=seed)
pca.fit(Xfit)
P = pca.transform(X).astype(np.float32)
# Per-axis min-max rescaling computed on the fit subsample so the
# transform is consistent with what the dashboard scatter expects.
Pfit = pca.transform(Xfit)
lo = Pfit.min(axis=0)
hi = Pfit.max(axis=0)
yfit = y
info: dict = {"method": method}
if method == "pca":
from sklearn.decomposition import PCA
proj = PCA(n_components=3, random_state=seed)
proj.fit(Xfit)
info["explained_variance_ratio"] = proj.explained_variance_ratio_.tolist()
log.info("PCA-3 explained variance: %s",
[f"{v:.3f}" for v in proj.explained_variance_ratio_])
elif method == "lda":
if yfit is None:
raise ValueError("LDA needs class labels y")
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
unique_y = np.unique(yfit)
n_components = min(3, len(unique_y) - 1, Xfit.shape[1])
if n_components < 3:
log.warning("LDA can only produce %d components for %d classes; "
"padding with PCA on residual", n_components, len(unique_y))
proj = LinearDiscriminantAnalysis(n_components=n_components)
proj.fit(Xfit, yfit)
evr = getattr(proj, "explained_variance_ratio_", None)
if evr is not None:
info["explained_class_variance"] = list(evr)
log.info("LDA-3 between-class variance ratio: %s",
[f"{v:.3f}" for v in evr])
else:
info["explained_class_variance"] = []
elif method == "umap":
# Supervised UMAP — passes y to UMAP.fit so points of the same
# phase are pulled closer in the embedding space. Nonlinear,
# so unlike PCA/LDA there's no "save the projector matrix and
# apply later" path — we always fit_transform the whole batch.
# Cost: ~60120 s for 50k subsample on a Pi-class CPU. We use
# the subsample for the fit (the class manifold is well-defined
# at 50k) then transform() the rest of the points through the
# learned manifold.
try:
import umap
except ImportError as e:
raise RuntimeError(
"umap-learn not installed. pip install umap-learn"
) from e
log.info("UMAP fit on %d sampled points (this can take 12 min)…",
Xfit.shape[0])
reducer = umap.UMAP(
n_components=3,
n_neighbors=30,
min_dist=0.1,
metric="euclidean",
random_state=seed,
n_jobs=1, # n_jobs > 1 disables determinism in UMAP
target_weight=0.5,
)
if yfit is not None:
reducer.fit(Xfit, yfit)
log.info("UMAP fit done; transforming remaining points…")
else:
reducer.fit(Xfit)
# transform() in UMAP can be slow (~few sec / 1k points). For
# 150k we do it in chunks so the log shows progress.
chunks = []
chunk_size = 5000
for i in range(0, X.shape[0], chunk_size):
chunks.append(reducer.transform(X[i:i + chunk_size]))
P_full = np.concatenate(chunks, axis=0).astype(np.float32)
# Also need Pfit for min/max rescaling; reuse what reducer
# remembers about the fit.
Pfit_full = reducer.embedding_.astype(np.float32)
info["n_neighbors"] = 30
info["min_dist"] = 0.1
info["target_weight"] = 0.5
# Bypass the generic .transform path below since we already
# have P_full + Pfit_full
lo = Pfit_full.min(axis=0)
hi = Pfit_full.max(axis=0)
span = np.maximum(hi - lo, 1e-6)
P01 = (P_full - lo) / span
return np.clip(P01, 0.0, 1.0).astype(np.float32), info
else:
raise ValueError(f"unknown projector: {method!r}")
P = proj.transform(X).astype(np.float32)
if P.shape[1] < 3:
# Pad missing dims with zeros so the dashboard's 3D viewer still works
pad = np.zeros((P.shape[0], 3 - P.shape[1]), dtype=np.float32)
P = np.concatenate([P, pad], axis=1)
Pfit = proj.transform(Xfit)
if Pfit.shape[1] < 3:
Pfit = np.concatenate(
[Pfit, np.zeros((Pfit.shape[0], 3 - Pfit.shape[1]))], axis=1
)
lo = Pfit[:, :3].min(axis=0)
hi = Pfit[:, :3].max(axis=0)
span = np.maximum(hi - lo, 1e-6)
P01 = (P - lo) / span
return np.clip(P01, 0.0, 1.0).astype(np.float32)
P01 = (P[:, :3] - lo) / span
return np.clip(P01, 0.0, 1.0).astype(np.float32), info
# Backwards-compat: produce() used to call _pca3 directly.
def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000, seed: int = 0
) -> np.ndarray:
XYZ, _ = _project3(X, method="pca", sample_for_fit=sample_for_fit, seed=seed)
return XYZ
def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000,
@ -178,7 +297,10 @@ async def _produce(args) -> int:
max_rows=args.max_rows)
log.info("loaded %d windows × %d features", X.shape[0], X.shape[1])
Xz = _standardize(X)
XYZ = _pca3(Xz, seed=args.seed)
XYZ, proj_info = _project3(
Xz, method=args.projector, y=y, seed=args.seed,
)
log.info("projector=%s diagnostics=%s", args.projector, proj_info)
cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed)
# KNN classifier — predict-on-self via leave-one-out approximation:
@ -308,29 +430,41 @@ async def _stream(args) -> int:
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
def _build_msg(i: int) -> dict:
return {
"type": "embedding",
"x": float(cols["x"][i]),
"y": float(cols["y"][i]),
"z": float(cols["z"][i]),
"phase": _safe_phase(int(cols["phase_int"][i])),
"predicted": _safe_phase(int(cols["predicted_int"][i])),
"cluster": int(cols["cluster"][i]),
}
cycle = 0
while True:
started = time.monotonic()
n_emit = 0
for i in sel:
phase_int = int(cols["phase_int"][i])
pred_int = int(cols["predicted_int"][i])
msg = {
"type": "embedding",
"x": float(cols["x"][i]),
"y": float(cols["y"][i]),
"z": float(cols["z"][i]),
"phase": _safe_phase(phase_int),
"predicted": _safe_phase(pred_int),
"cluster": int(cols["cluster"][i]),
}
await publisher(msg)
n_emit += 1
# Fan out the publishes in batches via asyncio.gather. Each
# publish is its own loopback HTTP POST, but gather lets ~burst_size
# of them be in flight concurrently — turns sequential ~5 ms/event
# into parallel ~5 ms / batch. 2000 points at burst_size=50 →
# ~40 batches × ~5 ms ≈ 200 ms cycle vs 13 s sequential.
burst_size = max(1, int(args.burst_size))
for chunk_start in range(0, len(sel), burst_size):
chunk = sel[chunk_start:chunk_start + burst_size]
await asyncio.gather(*(publisher(_build_msg(int(i))) for i in chunk))
n_emit += len(chunk)
if args.rate_hz > 0:
await asyncio.sleep(1.0 / args.rate_hz)
# Cap per-batch — burst then a small sleep; total events/sec
# ≤ rate_hz × (burst_size / burst_size) = rate_hz × 1 effectively
# because each batch finishes near-simultaneously. Keep this
# conservative so we don't peg the dashboard process.
await asyncio.sleep(burst_size / args.rate_hz)
cycle += 1
elapsed = time.monotonic() - started
log.info("cycle %d: published %d embeddings in %.1fs", cycle, n_emit, elapsed)
log.info("cycle %d: published %d embeddings in %.1fs (burst=%d)",
cycle, n_emit, elapsed, burst_size)
if not args.loop:
return 0
# Pause between cycles so we're not pegging the dashboard at 100%
@ -404,6 +538,14 @@ def main() -> int:
pp = sub.add_parser("produce", parents=[common],
help="emit Embedding events for scene-11 scatter")
pp.add_argument("--projector", choices=["pca", "lda", "umap"], default="lda",
help="3-D projection method. 'pca' = unsupervised "
"max-variance. 'lda' (default) = supervised "
"Fisher discriminant; linear, fast, 96%+ "
"between-class variance in first 3 axes. "
"'umap' = supervised nonlinear manifold "
"embedding; tighter visual clusters but "
"slower (~few minutes for 150k points).")
pp.add_argument("--k-clusters", type=int, default=8)
pp.add_argument("--max-rows", type=int, default=None,
help="cap windows loaded (testing)")
@ -434,8 +576,17 @@ def main() -> int:
ps.add_argument("--max-points", type=int, default=2000,
help="cap published points per cycle (default 2000); "
"0 = all rows")
ps.add_argument("--rate-hz", type=float, default=200.0,
help="publish rate cap (events/sec); 0 = unlimited")
ps.add_argument("--burst-size", type=int, default=50,
help="number of publish calls fired concurrently per "
"asyncio.gather batch. Higher = more parallelism, "
"more pressure on the dashboard. 50 turns a "
"13 s sequential cycle into ~0.3 s.")
ps.add_argument("--rate-hz", type=float, default=2000.0,
help="upper bound on events/sec; 0 = unlimited. With "
"burst-size > 1 this is the per-batch cap, so "
"effective throughput approaches "
"burst_size × rate_hz / burst_size = rate_hz "
"(ignoring HTTP overhead).")
ps.add_argument("--loop", action="store_true",
help="cycle indefinitely so reconnecting browsers "
"stay populated")