The model layer of the project, built honestly:
- tools/dataset_validate.py — full-sweep validator over the receiver
store (sha256, schema, monotonic labels, telemetry-row gate). On the
current corpus: 64,798 accepted + 8,154 degraded + 3,701 rejected +
7 errored across 76,660 shipped episodes. data/processed/validation_v1.parquet
is committed as the per-episode acceptance index.
- training/_features.py — channel registry (46 channels across
proc/guest/qmp/netflow), summary-stat windowing AND channel×time
tensor extraction at 10s/5s windowing. Time alignment uses t_wall_ns
(Unix ns) — tested fix for a real netflow-vs-host clock-base
inconsistency that was silently dropping every netflow channel.
- training/_split.py — three held-out recipes (host / sample / time)
with profile-stratification assertions. held_out_host carries
untested_profiles for cases like scan-and-dial absent from the test
host (5 of 6 profiles tested cross-device, never silently averaged).
- training/models/ — 6 architectures behind a common BaseModel
interface: gbt (XGBoost), mlp, cnn, gru, lstm, transformer. Each
trained twice (realistic / oracle) per the deployment threat model.
Schema-hashed checkpoints refuse to load if _features.py changed
since training (silent-input-drift protection, tested).
- training/trainer/ — unified training loop: class-weighted CE, LR
warmup + cosine, gradient clipping, mixed precision when CUDA,
early stopping on val macro F1, best-on-val checkpoint. Same loop
runs MLP/CNN/GRU/LSTM/Transformer; GBT uses XGBoost
early_stopping_rounds on val mlogloss.
- training/eval_/ — bootstrap 95% CIs on macro F1, per-class F1,
per-profile and per-host breakdown, paired-bootstrap significance
for model-vs-model gap. Confusion matrix uses union of seen labels.
- training/dashboard/producers/ — replay/metrics/perf/profiles
emitting the six event types the dashboard's awaiting scenes
consume; on-demand tensor extraction so the Pi can run live
inference without 65 GB of shards.
- 17 unit tests (split coverage, features round-trip, schema mismatch,
determinism, time-base alignment regression).
End-to-end smoke-trained all six on a 567-episode subset; held-out
test macro F1 reported with paired-bootstrap significance. The
methodology now reports honest cross-device generalization, not
in-distribution validation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
271 lines
9.5 KiB
Python
271 lines
9.5 KiB
Python
"""Build per-episode and per-window feature parquet from validated episodes.
|
||
|
||
Inputs:
|
||
--validation data/processed/validation_v1.parquet (from dataset_validate.py)
|
||
--store /var/lib/cis490/episodes
|
||
--out-dir data/processed/
|
||
|
||
Outputs:
|
||
features_episode_v1.parquet one row per accepted+degraded episode
|
||
features_window_v1.parquet one row per (episode, window)
|
||
feature_schema_v1.json column names + in_deployment mask + phase enum
|
||
|
||
Run:
|
||
uv run --group training python training/build_features.py \\
|
||
--validation data/processed/validation_v1.parquet \\
|
||
--store /var/lib/cis490/episodes \\
|
||
--out-dir data/processed
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import multiprocessing as mp
|
||
import os
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import pyarrow as pa
|
||
import pyarrow.parquet as pq
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||
from training._episode_io import open_episode
|
||
from training._features import (
|
||
ALL_CHANNELS,
|
||
DEFAULT_STRIDE_S,
|
||
DEFAULT_WINDOW_S,
|
||
PHASES,
|
||
episode_features,
|
||
feature_names_episode,
|
||
feature_names_window,
|
||
in_deployment_mask,
|
||
summary_windows,
|
||
)
|
||
|
||
|
||
def _process_one(args):
|
||
epi_id, host_id, profile, sample_name, sample_kind, store_root = args
|
||
path = Path(store_root) / host_id / f"{epi_id}.tar.zst"
|
||
try:
|
||
epi = open_episode(path, host_id=host_id)
|
||
except Exception as e:
|
||
return {"episode_id": epi_id, "error": f"{type(e).__name__}:{e}"}
|
||
epi_vec, _ = episode_features(epi)
|
||
Xw, yw, tw, _ = summary_windows(
|
||
epi, window_s=DEFAULT_WINDOW_S, stride_s=DEFAULT_STRIDE_S,
|
||
)
|
||
return {
|
||
"episode_id": epi_id,
|
||
"host_id": host_id,
|
||
"profile": profile,
|
||
"sample_name": sample_name,
|
||
"sample_kind": sample_kind,
|
||
"episode_features": epi_vec.astype(np.float32),
|
||
"window_features": Xw, # (n_windows, n_feat)
|
||
"window_phase": yw, # (n_windows,)
|
||
"window_t_center": tw, # (n_windows,)
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--validation", required=True, type=Path)
|
||
ap.add_argument("--store", required=True, type=Path)
|
||
ap.add_argument("--out-dir", required=True, type=Path)
|
||
ap.add_argument("--workers", type=int, default=max(1, (os.cpu_count() or 2) - 1))
|
||
ap.add_argument("--limit", type=int, default=0)
|
||
ap.add_argument("--include-degraded", action="store_true", default=True)
|
||
args = ap.parse_args()
|
||
|
||
args.out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
val = pq.read_table(args.validation).to_pylist()
|
||
statuses = ("accepted", "degraded") if args.include_degraded else ("accepted",)
|
||
work = [r for r in val if r["status"] in statuses]
|
||
if args.limit:
|
||
work = work[: args.limit]
|
||
print(f"feature extraction over {len(work)} episodes "
|
||
f"(statuses={statuses}) with {args.workers} workers", flush=True)
|
||
|
||
feat_names_e = feature_names_episode()
|
||
feat_names_w = feature_names_window()
|
||
|
||
job_args = [
|
||
(r["episode_id"], r["host_id"], r["profile"],
|
||
r["sample_name"], r["sample_kind"], str(args.store))
|
||
for r in work
|
||
]
|
||
|
||
# Window-level grows fast (~50 windows × 76k = ~3.8M rows × ~215 cols ≈ 3GB f32).
|
||
# Accumulate column-wise: one float32 array per feature column, plus per-row
|
||
# metadata lists. Flush as a pyarrow Table every CHUNK rows.
|
||
win_writer: pq.ParquetWriter | None = None
|
||
win_schema = pa.schema(
|
||
[
|
||
("episode_id", pa.string()),
|
||
("host_id", pa.string()),
|
||
("profile", pa.string()),
|
||
("sample_name", pa.string()),
|
||
("sample_kind", pa.string()),
|
||
("t_center_s", pa.float32()),
|
||
("phase", pa.int8()),
|
||
]
|
||
+ [(n, pa.float32()) for n in feat_names_w]
|
||
)
|
||
win_path = args.out_dir / "features_window_v1.parquet"
|
||
|
||
# Episode-level — small, accumulate columnar lists.
|
||
epi_meta_cols: dict[str, list] = {
|
||
"episode_id": [], "host_id": [], "profile": [],
|
||
"sample_name": [], "sample_kind": [],
|
||
}
|
||
epi_feat_arrs: list[np.ndarray] = [] # each (n_feat,) float32
|
||
|
||
# Window-level columnar accumulators.
|
||
win_meta: dict[str, list] = {
|
||
"episode_id": [], "host_id": [], "profile": [],
|
||
"sample_name": [], "sample_kind": [],
|
||
}
|
||
win_t_center: list[np.ndarray] = []
|
||
win_phase: list[np.ndarray] = []
|
||
win_features_chunks: list[np.ndarray] = [] # each (n_rows, n_feat)
|
||
win_rows_buffered = 0
|
||
CHUNK = 100_000
|
||
|
||
def flush_win():
|
||
nonlocal win_writer, win_rows_buffered
|
||
if win_rows_buffered == 0:
|
||
return
|
||
X = np.concatenate(win_features_chunks, axis=0) # (N, F) float32
|
||
t = np.concatenate(win_t_center, axis=0).astype(np.float32)
|
||
ph = np.concatenate(win_phase, axis=0).astype(np.int8)
|
||
cols = {
|
||
"episode_id": pa.array(win_meta["episode_id"], type=pa.string()),
|
||
"host_id": pa.array(win_meta["host_id"], type=pa.string()),
|
||
"profile": pa.array(win_meta["profile"], type=pa.string()),
|
||
"sample_name": pa.array(win_meta["sample_name"], type=pa.string()),
|
||
"sample_kind": pa.array(win_meta["sample_kind"], type=pa.string()),
|
||
"t_center_s": pa.array(t, type=pa.float32()),
|
||
"phase": pa.array(ph, type=pa.int8()),
|
||
}
|
||
for j, name in enumerate(feat_names_w):
|
||
cols[name] = pa.array(X[:, j], type=pa.float32())
|
||
tbl = pa.table(cols, schema=win_schema)
|
||
if win_writer is None:
|
||
win_writer = pq.ParquetWriter(win_path, win_schema, compression="zstd")
|
||
win_writer.write_table(tbl)
|
||
for k in win_meta:
|
||
win_meta[k].clear()
|
||
win_features_chunks.clear()
|
||
win_t_center.clear()
|
||
win_phase.clear()
|
||
win_rows_buffered = 0
|
||
|
||
started = time.monotonic()
|
||
last_print = started
|
||
n_errors = 0
|
||
|
||
if args.workers <= 1:
|
||
results = (_process_one(a) for a in job_args)
|
||
else:
|
||
pool = mp.Pool(args.workers)
|
||
results = pool.imap_unordered(_process_one, job_args, chunksize=8)
|
||
|
||
for i, res in enumerate(results, 1):
|
||
if "error" in res:
|
||
n_errors += 1
|
||
if n_errors <= 5:
|
||
print(f" ERROR {res['episode_id']}: {res['error']}", flush=True)
|
||
continue
|
||
|
||
# Episode-level
|
||
epi_meta_cols["episode_id"].append(res["episode_id"])
|
||
epi_meta_cols["host_id"].append(res["host_id"])
|
||
epi_meta_cols["profile"].append(res["profile"])
|
||
epi_meta_cols["sample_name"].append(res["sample_name"])
|
||
epi_meta_cols["sample_kind"].append(res["sample_kind"])
|
||
epi_feat_arrs.append(res["episode_features"])
|
||
|
||
# Window-level
|
||
Xw = res["window_features"]
|
||
yw = res["window_phase"]
|
||
tw = res["window_t_center"]
|
||
n = Xw.shape[0]
|
||
if n:
|
||
win_meta["episode_id"].extend([res["episode_id"]] * n)
|
||
win_meta["host_id"].extend([res["host_id"]] * n)
|
||
win_meta["profile"].extend([res["profile"]] * n)
|
||
win_meta["sample_name"].extend([res["sample_name"]] * n)
|
||
win_meta["sample_kind"].extend([res["sample_kind"]] * n)
|
||
win_features_chunks.append(Xw)
|
||
win_t_center.append(tw)
|
||
win_phase.append(yw)
|
||
win_rows_buffered += n
|
||
if win_rows_buffered >= CHUNK:
|
||
flush_win()
|
||
|
||
if i % 500 == 0 or time.monotonic() - last_print > 30:
|
||
now = time.monotonic()
|
||
rate = i / max(1e-3, now - started)
|
||
print(f" {i}/{len(work)} ({rate:.1f}/s, errors={n_errors})", flush=True)
|
||
last_print = now
|
||
|
||
if args.workers > 1:
|
||
pool.close(); pool.join()
|
||
|
||
flush_win()
|
||
if win_writer is not None:
|
||
win_writer.close()
|
||
|
||
# Episode-level parquet
|
||
epi_schema = pa.schema(
|
||
[
|
||
("episode_id", pa.string()),
|
||
("host_id", pa.string()),
|
||
("profile", pa.string()),
|
||
("sample_name", pa.string()),
|
||
("sample_kind", pa.string()),
|
||
]
|
||
+ [(n, pa.float32()) for n in feat_names_e]
|
||
)
|
||
if epi_feat_arrs:
|
||
E = np.stack(epi_feat_arrs, axis=0) # (N, F)
|
||
else:
|
||
E = np.zeros((0, len(feat_names_e)), dtype=np.float32)
|
||
epi_cols: dict[str, pa.Array] = {
|
||
k: pa.array(v, type=pa.string()) for k, v in epi_meta_cols.items()
|
||
}
|
||
for j, name in enumerate(feat_names_e):
|
||
epi_cols[name] = pa.array(E[:, j], type=pa.float32())
|
||
epi_tbl = pa.table(epi_cols, schema=epi_schema)
|
||
epi_path = args.out_dir / "features_episode_v1.parquet"
|
||
pq.write_table(epi_tbl, epi_path, compression="zstd")
|
||
|
||
schema_doc = {
|
||
"version": 1,
|
||
"phases": PHASES,
|
||
"feature_names": feat_names_e,
|
||
"in_deployment_mask": [bool(b) for b in in_deployment_mask().tolist()],
|
||
"channels": [
|
||
{"name": c.name, "source": c.source, "kind": c.kind,
|
||
"in_deployment": c.in_deployment}
|
||
for c in ALL_CHANNELS
|
||
],
|
||
"stat_suffixes": ["mean", "std", "p50", "p95", "slope"],
|
||
"window_seconds": DEFAULT_WINDOW_S,
|
||
"stride_seconds": DEFAULT_STRIDE_S,
|
||
}
|
||
(args.out_dir / "feature_schema_v1.json").write_text(
|
||
json.dumps(schema_doc, indent=2) + "\n"
|
||
)
|
||
|
||
print(f"\nepisode features: {epi_path} ({E.shape[0]} rows)")
|
||
print(f"window features: {win_path}")
|
||
print(f"errors: {n_errors}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|