"""Build channel × time tensor shards for sequence-model training. One .npz per accepted/degraded episode under data/processed/tensor_window_v1/host=/.npz Each shard contains: X (n_windows, n_channels, n_timesteps) float32 mask (n_windows, n_channels, n_timesteps) bool y (n_windows,) int64 t_center (n_windows,) float64 episode_id () str (numpy 0-d) host_id () str profile () str sample_name () str channel_names (n_channels,) str array Compression: np.savez_compressed (zlib). Each episode is ~700KB compressed → ~50GB for the full corpus on the GPU box. The Pi does NOT need shards — it can call ``tensor_windows(epi)`` on demand from a tarball during inference. This script is for the training box only. """ from __future__ import annotations import argparse import logging import multiprocessing as mp import os import sys import time from pathlib import Path import numpy as np import pyarrow.parquet as pq sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from training._episode_io import open_episode from training._features import ( DEFAULT_STRIDE_S, DEFAULT_WINDOW_S, TENSOR_HZ, channel_names, tensor_windows, ) log = logging.getLogger("cis490.build_tensors") def _process_one(args) -> dict: epi_id, host_id, profile, sample_name, sample_kind, store_root, out_root = args out_path = Path(out_root) / f"host={host_id}" / f"{epi_id}.npz" if out_path.exists(): return {"episode_id": epi_id, "skipped": True} out_path.parent.mkdir(parents=True, exist_ok=True) src = Path(store_root) / host_id / f"{epi_id}.tar.zst" try: epi = open_episode(src, host_id=host_id) except Exception as e: return {"episode_id": epi_id, "error": f"{type(e).__name__}:{e}"} X, y, t, M, _info = tensor_windows( epi, window_s=DEFAULT_WINDOW_S, stride_s=DEFAULT_STRIDE_S, hz=TENSOR_HZ, ) if X.shape[0] == 0: return {"episode_id": epi_id, "empty": True} np.savez_compressed( out_path, X=X.astype(np.float32, copy=False), mask=M.astype(np.bool_), y=y.astype(np.int64), t_center=t.astype(np.float64), episode_id=np.asarray(epi_id), host_id=np.asarray(host_id), profile=np.asarray(profile or ""), sample_name=np.asarray(sample_name or ""), sample_kind=np.asarray(sample_kind or ""), channel_names=np.asarray(channel_names()), ) return {"episode_id": epi_id, "n_windows": X.shape[0], "size_bytes": out_path.stat().st_size} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--validation", required=True, type=Path) ap.add_argument("--store", required=True, type=Path) ap.add_argument("--out-dir", required=True, type=Path) ap.add_argument("--workers", type=int, default=max(1, (os.cpu_count() or 2) - 1)) ap.add_argument("--limit", type=int, default=0) ap.add_argument("--include-degraded", action="store_true", default=True) ap.add_argument("--log-level", default="INFO") args = ap.parse_args() logging.basicConfig(level=args.log_level, format="%(asctime)s %(levelname)s %(name)s %(message)s") args.out_dir.mkdir(parents=True, exist_ok=True) val = pq.read_table(args.validation).to_pylist() statuses = ("accepted", "degraded") if args.include_degraded else ("accepted",) work = [r for r in val if r["status"] in statuses] if args.limit: work = work[: args.limit] log.info("building tensor shards for %d episodes with %d workers", len(work), args.workers) job_args = [ (r["episode_id"], r["host_id"], r["profile"], r["sample_name"], r["sample_kind"], str(args.store), str(args.out_dir)) for r in work ] started = time.monotonic() results: list[dict] = [] if args.workers <= 1: for a in job_args: results.append(_process_one(a)) else: with mp.Pool(args.workers) as pool: for i, res in enumerate(pool.imap_unordered( _process_one, job_args, chunksize=8), 1): results.append(res) if i % 500 == 0: rate = i / max(1e-3, time.monotonic() - started) log.info(" %d/%d (%.1f/s)", i, len(work), rate) skipped = sum(1 for r in results if r.get("skipped")) empty = sum(1 for r in results if r.get("empty")) errs = sum(1 for r in results if "error" in r) ok = len(results) - skipped - empty - errs total_bytes = sum(r.get("size_bytes", 0) for r in results) total_windows = sum(r.get("n_windows", 0) for r in results) log.info("done: ok=%d skipped=%d empty=%d errors=%d " "windows=%d total=%.1f MB", ok, skipped, empty, errs, total_windows, total_bytes / 1e6) if errs and errs <= 20: for r in results: if "error" in r: log.warning(" %s: %s", r["episode_id"], r["error"]) return 0 if __name__ == "__main__": raise SystemExit(main())