diff --git a/training/producers/knn.py b/training/producers/knn.py index 7447f18..c7aa54c 100644 --- a/training/producers/knn.py +++ b/training/producers/knn.py @@ -270,6 +270,73 @@ async def _produce(args) -> int: return 0 +async def _stream(args) -> int: + """Load a previously-saved knn_v1.parquet and publish embedding + events from it in a loop. The fit is already on disk so this + process is small and stateless — runs as a long-lived service or + one-shot. + + Why a separate streamer (not just `produce --loop`): + - `produce` re-fits PCA/KMeans/KNN every invocation, which is + expensive and unnecessary if we already have a saved fit. + - The stream loop re-publishes the same points so a browser + that reconnects 30 s after the last cycle still sees a + populated scatter (the dashboard doesn't replay live events + on reconnect — see PRODUCERS.md §reconnect-gotcha). + """ + from pyarrow import parquet as _pq + + if not args.load_fit.exists(): + log.error("fit parquet not found: %s — run 'knn produce --fit-out' first", + args.load_fit) + return 1 + log.info("streaming from %s", args.load_fit) + tbl = _pq.read_table(args.load_fit) + n = tbl.num_rows + log.info("loaded %d points", n) + + # Subsample once at startup so each cycle pushes the same points + # (deterministic given seed) and the scatter stays coherent. + rng = np.random.default_rng(args.seed) + if args.max_points > 0 and n > args.max_points: + sel = rng.choice(n, size=args.max_points, replace=False) + sel.sort() # ordered emission feels less random + else: + sel = np.arange(n) + + cols = {c: tbl.column(c).to_numpy(zero_copy_only=False) for c in tbl.column_names} + publisher = (null_publisher() if args.dry_run + else http_publisher(args.publish_url)) + + cycle = 0 + while True: + started = time.monotonic() + n_emit = 0 + for i in sel: + phase_int = int(cols["phase_int"][i]) + pred_int = int(cols["predicted_int"][i]) + msg = { + "type": "embedding", + "x": float(cols["x"][i]), + "y": float(cols["y"][i]), + "z": float(cols["z"][i]), + "phase": _safe_phase(phase_int), + "predicted": _safe_phase(pred_int), + "cluster": int(cols["cluster"][i]), + } + await publisher(msg) + n_emit += 1 + if args.rate_hz > 0: + await asyncio.sleep(1.0 / args.rate_hz) + cycle += 1 + elapsed = time.monotonic() - started + log.info("cycle %d: published %d embeddings in %.1fs", cycle, n_emit, elapsed) + if not args.loop: + return 0 + # Pause between cycles so we're not pegging the dashboard at 100% + await asyncio.sleep(args.cycle_pause_s) + + async def _metric(args) -> int: """One-shot publish of ModelMetric{model: 'knn', accuracy: macro_f1}.""" from training.eval_._metrics import _macro_f1 @@ -357,6 +424,28 @@ def main() -> int: help="re-publish period (s); 0 = one-shot") pm.set_defaults(func=_metric) + # ── stream subcommand: replay a saved fit parquet on a loop ────── + ps = sub.add_parser("stream", + help="publish Embedding events from a saved " + "fit parquet (no re-fit)") + ps.add_argument("--load-fit", required=True, type=Path, + help="path to a parquet from `knn produce --fit-out`") + ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish") + ps.add_argument("--max-points", type=int, default=2000, + help="cap published points per cycle (default 2000); " + "0 = all rows") + ps.add_argument("--rate-hz", type=float, default=200.0, + help="publish rate cap (events/sec); 0 = unlimited") + ps.add_argument("--loop", action="store_true", + help="cycle indefinitely so reconnecting browsers " + "stay populated") + ps.add_argument("--cycle-pause-s", type=float, default=15.0, + help="pause between cycles when --loop is set") + ps.add_argument("--seed", type=int, default=0) + ps.add_argument("--dry-run", action="store_true") + ps.add_argument("--log-level", default="INFO") + ps.set_defaults(func=_stream) + args = p.parse_args() logging.basicConfig(level=args.log_level, format="%(asctime)s %(levelname)s %(name)s %(message)s")