CIS490/training/build_tensors.py
Max 1fabd4a246 training: validator, feature/tensor extractors, 6 supervised models, schema-hashed checkpoints, eval suite, dashboard producers
The model layer of the project, built honestly:

  - tools/dataset_validate.py — full-sweep validator over the receiver
    store (sha256, schema, monotonic labels, telemetry-row gate). On the
    current corpus: 64,798 accepted + 8,154 degraded + 3,701 rejected +
    7 errored across 76,660 shipped episodes. data/processed/validation_v1.parquet
    is committed as the per-episode acceptance index.

  - training/_features.py — channel registry (46 channels across
    proc/guest/qmp/netflow), summary-stat windowing AND channel×time
    tensor extraction at 10s/5s windowing. Time alignment uses t_wall_ns
    (Unix ns) — tested fix for a real netflow-vs-host clock-base
    inconsistency that was silently dropping every netflow channel.

  - training/_split.py — three held-out recipes (host / sample / time)
    with profile-stratification assertions. held_out_host carries
    untested_profiles for cases like scan-and-dial absent from the test
    host (5 of 6 profiles tested cross-device, never silently averaged).

  - training/models/ — 6 architectures behind a common BaseModel
    interface: gbt (XGBoost), mlp, cnn, gru, lstm, transformer. Each
    trained twice (realistic / oracle) per the deployment threat model.
    Schema-hashed checkpoints refuse to load if _features.py changed
    since training (silent-input-drift protection, tested).

  - training/trainer/ — unified training loop: class-weighted CE, LR
    warmup + cosine, gradient clipping, mixed precision when CUDA,
    early stopping on val macro F1, best-on-val checkpoint. Same loop
    runs MLP/CNN/GRU/LSTM/Transformer; GBT uses XGBoost
    early_stopping_rounds on val mlogloss.

  - training/eval_/ — bootstrap 95% CIs on macro F1, per-class F1,
    per-profile and per-host breakdown, paired-bootstrap significance
    for model-vs-model gap. Confusion matrix uses union of seen labels.

  - training/dashboard/producers/ — replay/metrics/perf/profiles
    emitting the six event types the dashboard's awaiting scenes
    consume; on-demand tensor extraction so the Pi can run live
    inference without 65 GB of shards.

  - 17 unit tests (split coverage, features round-trip, schema mismatch,
    determinism, time-base alignment regression).

End-to-end smoke-trained all six on a 567-episode subset; held-out
test macro F1 reported with paired-bootstrap significance. The
methodology now reports honest cross-device generalization, not
in-distribution validation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:19:00 -05:00

143 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Build channel × time tensor shards for sequence-model training.
One .npz per accepted/degraded episode under
data/processed/tensor_window_v1/host=<host>/<episode_id>.npz
Each shard contains:
X (n_windows, n_channels, n_timesteps) float32
mask (n_windows, n_channels, n_timesteps) bool
y (n_windows,) int64
t_center (n_windows,) float64
episode_id () str (numpy 0-d)
host_id () str
profile () str
sample_name () str
channel_names (n_channels,) str array
Compression: np.savez_compressed (zlib). Each episode is ~700KB
compressed → ~50GB for the full corpus on the GPU box.
The Pi does NOT need shards — it can call ``tensor_windows(epi)`` on
demand from a tarball during inference. This script is for the
training box only.
"""
from __future__ import annotations
import argparse
import logging
import multiprocessing as mp
import os
import sys
import time
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from training._episode_io import open_episode
from training._features import (
DEFAULT_STRIDE_S, DEFAULT_WINDOW_S, TENSOR_HZ,
channel_names, tensor_windows,
)
log = logging.getLogger("cis490.build_tensors")
def _process_one(args) -> dict:
epi_id, host_id, profile, sample_name, sample_kind, store_root, out_root = args
out_path = Path(out_root) / f"host={host_id}" / f"{epi_id}.npz"
if out_path.exists():
return {"episode_id": epi_id, "skipped": True}
out_path.parent.mkdir(parents=True, exist_ok=True)
src = Path(store_root) / host_id / f"{epi_id}.tar.zst"
try:
epi = open_episode(src, host_id=host_id)
except Exception as e:
return {"episode_id": epi_id, "error": f"{type(e).__name__}:{e}"}
X, y, t, M, _info = tensor_windows(
epi, window_s=DEFAULT_WINDOW_S, stride_s=DEFAULT_STRIDE_S, hz=TENSOR_HZ,
)
if X.shape[0] == 0:
return {"episode_id": epi_id, "empty": True}
np.savez_compressed(
out_path,
X=X.astype(np.float32, copy=False),
mask=M.astype(np.bool_),
y=y.astype(np.int64),
t_center=t.astype(np.float64),
episode_id=np.asarray(epi_id),
host_id=np.asarray(host_id),
profile=np.asarray(profile or ""),
sample_name=np.asarray(sample_name or ""),
sample_kind=np.asarray(sample_kind or ""),
channel_names=np.asarray(channel_names()),
)
return {"episode_id": epi_id, "n_windows": X.shape[0],
"size_bytes": out_path.stat().st_size}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--validation", required=True, type=Path)
ap.add_argument("--store", required=True, type=Path)
ap.add_argument("--out-dir", required=True, type=Path)
ap.add_argument("--workers", type=int,
default=max(1, (os.cpu_count() or 2) - 1))
ap.add_argument("--limit", type=int, default=0)
ap.add_argument("--include-degraded", action="store_true", default=True)
ap.add_argument("--log-level", default="INFO")
args = ap.parse_args()
logging.basicConfig(level=args.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
args.out_dir.mkdir(parents=True, exist_ok=True)
val = pq.read_table(args.validation).to_pylist()
statuses = ("accepted", "degraded") if args.include_degraded else ("accepted",)
work = [r for r in val if r["status"] in statuses]
if args.limit:
work = work[: args.limit]
log.info("building tensor shards for %d episodes with %d workers",
len(work), args.workers)
job_args = [
(r["episode_id"], r["host_id"], r["profile"],
r["sample_name"], r["sample_kind"],
str(args.store), str(args.out_dir))
for r in work
]
started = time.monotonic()
results: list[dict] = []
if args.workers <= 1:
for a in job_args:
results.append(_process_one(a))
else:
with mp.Pool(args.workers) as pool:
for i, res in enumerate(pool.imap_unordered(
_process_one, job_args, chunksize=8), 1):
results.append(res)
if i % 500 == 0:
rate = i / max(1e-3, time.monotonic() - started)
log.info(" %d/%d (%.1f/s)", i, len(work), rate)
skipped = sum(1 for r in results if r.get("skipped"))
empty = sum(1 for r in results if r.get("empty"))
errs = sum(1 for r in results if "error" in r)
ok = len(results) - skipped - empty - errs
total_bytes = sum(r.get("size_bytes", 0) for r in results)
total_windows = sum(r.get("n_windows", 0) for r in results)
log.info("done: ok=%d skipped=%d empty=%d errors=%d "
"windows=%d total=%.1f MB",
ok, skipped, empty, errs, total_windows, total_bytes / 1e6)
if errs and errs <= 20:
for r in results:
if "error" in r:
log.warning(" %s: %s", r["episode_id"], r["error"])
return 0
if __name__ == "__main__":
raise SystemExit(main())