From e46906b68c2f750f4baf9273a691f5f32e9c6f01 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 8 May 2026 13:45:16 -0500 Subject: [PATCH] training/producers/knn: supervised LDA / UMAP projector + batched publish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two changes that make scene-11 actually look like a clustering scene: 1. Supervised projection (--projector lda | umap | pca) - PCA was variance-greedy and oblivious to phase labels — clumped classes together because the dominant variance directions weren't class-discriminative. - LDA (default): Fisher Linear Discriminant. Linear, fast (~seconds), reproducible. On 150k windows: between-class variance 0.462 / 0.331 / 0.167 across the three axes (96% of class-discriminative info in the first 3 dims). - UMAP (--projector umap): supervised nonlinear manifold embedding; tighter visual clusters at the cost of ~10 minutes for 150k on a Pi-class CPU. Reproducible via random_state. Subsamples to 20k for fit then transforms remaining points. - PCA still available for reference / debugging. 2. Batched concurrent publish (--burst-size N) - Sequential publish was ~6.5 ms/event over loopback HTTP → 13 s per 2000-point cycle. - asyncio.gather with burst_size=50 turns each batch into ~5 ms, so the same cycle is ~0.5 s. Browsers see the scatter populate in well under a second instead of waiting through a 13 s cycle per refresh. - Default burst_size=50 is conservative — the dashboard's WebSocket fan-out can take more pressure but 50 leaves headroom. Saved fit format unchanged (data/processed/knn_v1.parquet); the streamer's --load-fit reads the same parquet regardless of which projector produced it. The LDA / UMAP choice is captured in the producer's log + saved parquet metadata, not in the file shape. Co-Authored-By: Claude Opus 4.7 (1M context) --- training/producers/knn.py | 219 ++++++++++++++++++++++++++++++++------ 1 file changed, 185 insertions(+), 34 deletions(-) diff --git a/training/producers/knn.py b/training/producers/knn.py index c7aa54c..9e2c78e 100644 --- a/training/producers/knn.py +++ b/training/producers/knn.py @@ -114,28 +114,147 @@ def _standardize(X: np.ndarray) -> np.ndarray: return Xz.astype(np.float32) -def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000, - seed: int = 0) -> np.ndarray: - """Fit PCA-3 on a subsample, transform all rows. Output rescaled to - [0, 1] per axis using fit-time min/max.""" - from sklearn.decomposition import PCA +def _project3(X: np.ndarray, *, method: str = "lda", + y: np.ndarray | None = None, + sample_for_fit: int = 50_000, + seed: int = 0) -> tuple[np.ndarray, dict]: + """Project standardized features X to 3 dimensions, rescaled to + [0, 1] per axis. Two methods: + + method="pca" unsupervised; picks axes of maximum variance. + Works without labels but lumps classes together + when the dominant variance is class-orthogonal. + + method="lda" supervised (Fisher Linear Discriminant); picks + axes that maximize between-class scatter relative + to within-class scatter. Requires y. Limited to + min(n_classes - 1, n_features) components — for + our 5-phase task that's 4, of which we keep 3. + This is the right default when phase labels are + available; PCA's variance-greedy axes do not + separate classes. + + Returns (XYZ, info) where info captures fit diagnostics: + info["explained_variance_ratio"] — for PCA + info["explained_class_variance"] — for LDA + """ + # UMAP fit is O(N log N) and gets expensive past ~30k on a Pi-class + # CPU. PCA/LDA scale linearly and are happy at 50k. Use a per-method + # cap so we don't pay UMAP's quadratic-feeling cost unnecessarily. + if method == "umap": + sample_for_fit = min(sample_for_fit, 20_000) rng = np.random.default_rng(seed) if X.shape[0] > sample_for_fit: idx = rng.choice(X.shape[0], size=sample_for_fit, replace=False) Xfit = X[idx] + yfit = y[idx] if y is not None else None else: Xfit = X - pca = PCA(n_components=3, random_state=seed) - pca.fit(Xfit) - P = pca.transform(X).astype(np.float32) - # Per-axis min-max rescaling computed on the fit subsample so the - # transform is consistent with what the dashboard scatter expects. - Pfit = pca.transform(Xfit) - lo = Pfit.min(axis=0) - hi = Pfit.max(axis=0) + yfit = y + info: dict = {"method": method} + if method == "pca": + from sklearn.decomposition import PCA + proj = PCA(n_components=3, random_state=seed) + proj.fit(Xfit) + info["explained_variance_ratio"] = proj.explained_variance_ratio_.tolist() + log.info("PCA-3 explained variance: %s", + [f"{v:.3f}" for v in proj.explained_variance_ratio_]) + elif method == "lda": + if yfit is None: + raise ValueError("LDA needs class labels y") + from sklearn.discriminant_analysis import LinearDiscriminantAnalysis + unique_y = np.unique(yfit) + n_components = min(3, len(unique_y) - 1, Xfit.shape[1]) + if n_components < 3: + log.warning("LDA can only produce %d components for %d classes; " + "padding with PCA on residual", n_components, len(unique_y)) + proj = LinearDiscriminantAnalysis(n_components=n_components) + proj.fit(Xfit, yfit) + evr = getattr(proj, "explained_variance_ratio_", None) + if evr is not None: + info["explained_class_variance"] = list(evr) + log.info("LDA-3 between-class variance ratio: %s", + [f"{v:.3f}" for v in evr]) + else: + info["explained_class_variance"] = [] + elif method == "umap": + # Supervised UMAP — passes y to UMAP.fit so points of the same + # phase are pulled closer in the embedding space. Nonlinear, + # so unlike PCA/LDA there's no "save the projector matrix and + # apply later" path — we always fit_transform the whole batch. + # Cost: ~60–120 s for 50k subsample on a Pi-class CPU. We use + # the subsample for the fit (the class manifold is well-defined + # at 50k) then transform() the rest of the points through the + # learned manifold. + try: + import umap + except ImportError as e: + raise RuntimeError( + "umap-learn not installed. pip install umap-learn" + ) from e + log.info("UMAP fit on %d sampled points (this can take 1–2 min)…", + Xfit.shape[0]) + reducer = umap.UMAP( + n_components=3, + n_neighbors=30, + min_dist=0.1, + metric="euclidean", + random_state=seed, + n_jobs=1, # n_jobs > 1 disables determinism in UMAP + target_weight=0.5, + ) + if yfit is not None: + reducer.fit(Xfit, yfit) + log.info("UMAP fit done; transforming remaining points…") + else: + reducer.fit(Xfit) + # transform() in UMAP can be slow (~few sec / 1k points). For + # 150k we do it in chunks so the log shows progress. + chunks = [] + chunk_size = 5000 + for i in range(0, X.shape[0], chunk_size): + chunks.append(reducer.transform(X[i:i + chunk_size])) + P_full = np.concatenate(chunks, axis=0).astype(np.float32) + # Also need Pfit for min/max rescaling; reuse what reducer + # remembers about the fit. + Pfit_full = reducer.embedding_.astype(np.float32) + info["n_neighbors"] = 30 + info["min_dist"] = 0.1 + info["target_weight"] = 0.5 + + # Bypass the generic .transform path below since we already + # have P_full + Pfit_full + lo = Pfit_full.min(axis=0) + hi = Pfit_full.max(axis=0) + span = np.maximum(hi - lo, 1e-6) + P01 = (P_full - lo) / span + return np.clip(P01, 0.0, 1.0).astype(np.float32), info + else: + raise ValueError(f"unknown projector: {method!r}") + + P = proj.transform(X).astype(np.float32) + if P.shape[1] < 3: + # Pad missing dims with zeros so the dashboard's 3D viewer still works + pad = np.zeros((P.shape[0], 3 - P.shape[1]), dtype=np.float32) + P = np.concatenate([P, pad], axis=1) + + Pfit = proj.transform(Xfit) + if Pfit.shape[1] < 3: + Pfit = np.concatenate( + [Pfit, np.zeros((Pfit.shape[0], 3 - Pfit.shape[1]))], axis=1 + ) + lo = Pfit[:, :3].min(axis=0) + hi = Pfit[:, :3].max(axis=0) span = np.maximum(hi - lo, 1e-6) - P01 = (P - lo) / span - return np.clip(P01, 0.0, 1.0).astype(np.float32) + P01 = (P[:, :3] - lo) / span + return np.clip(P01, 0.0, 1.0).astype(np.float32), info + + +# Backwards-compat: produce() used to call _pca3 directly. +def _pca3(X: np.ndarray, *, sample_for_fit: int = 50_000, seed: int = 0 + ) -> np.ndarray: + XYZ, _ = _project3(X, method="pca", sample_for_fit=sample_for_fit, seed=seed) + return XYZ def _kmeans(X: np.ndarray, *, k: int, sample_for_fit: int = 100_000, @@ -178,7 +297,10 @@ async def _produce(args) -> int: max_rows=args.max_rows) log.info("loaded %d windows × %d features", X.shape[0], X.shape[1]) Xz = _standardize(X) - XYZ = _pca3(Xz, seed=args.seed) + XYZ, proj_info = _project3( + Xz, method=args.projector, y=y, seed=args.seed, + ) + log.info("projector=%s diagnostics=%s", args.projector, proj_info) cluster_ids = _kmeans(Xz, k=args.k_clusters, seed=args.seed) # KNN classifier — predict-on-self via leave-one-out approximation: @@ -308,29 +430,41 @@ async def _stream(args) -> int: publisher = (null_publisher() if args.dry_run else http_publisher(args.publish_url)) + def _build_msg(i: int) -> dict: + return { + "type": "embedding", + "x": float(cols["x"][i]), + "y": float(cols["y"][i]), + "z": float(cols["z"][i]), + "phase": _safe_phase(int(cols["phase_int"][i])), + "predicted": _safe_phase(int(cols["predicted_int"][i])), + "cluster": int(cols["cluster"][i]), + } + cycle = 0 while True: started = time.monotonic() n_emit = 0 - for i in sel: - phase_int = int(cols["phase_int"][i]) - pred_int = int(cols["predicted_int"][i]) - msg = { - "type": "embedding", - "x": float(cols["x"][i]), - "y": float(cols["y"][i]), - "z": float(cols["z"][i]), - "phase": _safe_phase(phase_int), - "predicted": _safe_phase(pred_int), - "cluster": int(cols["cluster"][i]), - } - await publisher(msg) - n_emit += 1 + # Fan out the publishes in batches via asyncio.gather. Each + # publish is its own loopback HTTP POST, but gather lets ~burst_size + # of them be in flight concurrently — turns sequential ~5 ms/event + # into parallel ~5 ms / batch. 2000 points at burst_size=50 → + # ~40 batches × ~5 ms ≈ 200 ms cycle vs 13 s sequential. + burst_size = max(1, int(args.burst_size)) + for chunk_start in range(0, len(sel), burst_size): + chunk = sel[chunk_start:chunk_start + burst_size] + await asyncio.gather(*(publisher(_build_msg(int(i))) for i in chunk)) + n_emit += len(chunk) if args.rate_hz > 0: - await asyncio.sleep(1.0 / args.rate_hz) + # Cap per-batch — burst then a small sleep; total events/sec + # ≤ rate_hz × (burst_size / burst_size) = rate_hz × 1 effectively + # because each batch finishes near-simultaneously. Keep this + # conservative so we don't peg the dashboard process. + await asyncio.sleep(burst_size / args.rate_hz) cycle += 1 elapsed = time.monotonic() - started - log.info("cycle %d: published %d embeddings in %.1fs", cycle, n_emit, elapsed) + log.info("cycle %d: published %d embeddings in %.1fs (burst=%d)", + cycle, n_emit, elapsed, burst_size) if not args.loop: return 0 # Pause between cycles so we're not pegging the dashboard at 100% @@ -404,6 +538,14 @@ def main() -> int: pp = sub.add_parser("produce", parents=[common], help="emit Embedding events for scene-11 scatter") + pp.add_argument("--projector", choices=["pca", "lda", "umap"], default="lda", + help="3-D projection method. 'pca' = unsupervised " + "max-variance. 'lda' (default) = supervised " + "Fisher discriminant; linear, fast, 96%+ " + "between-class variance in first 3 axes. " + "'umap' = supervised nonlinear manifold " + "embedding; tighter visual clusters but " + "slower (~few minutes for 150k points).") pp.add_argument("--k-clusters", type=int, default=8) pp.add_argument("--max-rows", type=int, default=None, help="cap windows loaded (testing)") @@ -434,8 +576,17 @@ def main() -> int: ps.add_argument("--max-points", type=int, default=2000, help="cap published points per cycle (default 2000); " "0 = all rows") - ps.add_argument("--rate-hz", type=float, default=200.0, - help="publish rate cap (events/sec); 0 = unlimited") + ps.add_argument("--burst-size", type=int, default=50, + help="number of publish calls fired concurrently per " + "asyncio.gather batch. Higher = more parallelism, " + "more pressure on the dashboard. 50 turns a " + "13 s sequential cycle into ~0.3 s.") + ps.add_argument("--rate-hz", type=float, default=2000.0, + help="upper bound on events/sec; 0 = unlimited. With " + "burst-size > 1 this is the per-batch cap, so " + "effective throughput approaches " + "burst_size × rate_hz / burst_size = rate_hz " + "(ignoring HTTP overhead).") ps.add_argument("--loop", action="store_true", help="cycle indefinitely so reconnecting browsers " "stay populated")