Non-parametric baseline alongside GBT/MLP/CNN/GRU/LSTM/Transformer. Same BaseModel + schema-hashed checkpoint contract; sidecar is a pickled sklearn KNeighborsClassifier (.knn.pkl) handled by the existing checkpoint machinery alongside .xgb.json / .pt. KNN's storage cost = n_train_rows × n_kept_features × 4 bytes. At 660k windows × 145 kept (realistic mode) features = ~380 MB sidecar; at 230 features (oracle) = ~600 MB. Heavy but ships through the same artifact-upload path. trainer/run.py learns a third fit branch: - GBT — XGBoost early stopping on val mlogloss - KNN — fit() memorizes; "training time" is val/test predict cost - NN — train_nn loop (the rest) Manifest gains knn-realistic + knn-oracle at priority 95 (just below GBT). KNN's k=10 default lives in the model class — overriding via hyper.k requires adding --k to run.py first to avoid the unknown-arg exit-2 issue. Smoke verified on the 567-episode subset: knn oracle val=0.7365 test=0.1333 (held-out k-gamingcom) That val/test gap (0.74 → 0.13) is the cross-device generalization story: KNN memorizes elliott-thinkpad's local feature space and falls apart on the other host. Honest baseline for the comparison report. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
327 lines
14 KiB
Python
327 lines
14 KiB
Python
"""End-to-end training driver.
|
|
|
|
Trains one ``(model, mode)`` combination — running all 12 is a bash loop
|
|
over this script (see scripts/train-all.sh). Single-process so each run
|
|
is isolatable, restartable, and produces its own log.
|
|
|
|
Steps:
|
|
1. Load features (summary or tensor depending on model.input_kind).
|
|
2. Apply held-out-by-host split (default) or held-out-by-time.
|
|
3. Filter to (train, val, test) episodes; collect (X, y) per slice.
|
|
4. Fit StandardizeStats on train *only*.
|
|
5. Build model with the right keep_mask + standardize.
|
|
6. Train (GBT: model.fit; NN: trainer._loop.train_nn).
|
|
7. Fit PCA-2 on standardized train features (for dashboard scatter).
|
|
8. Save checkpoint; emit metrics JSON.
|
|
|
|
Output:
|
|
artifacts/<model>_<mode>.ckpt.json (header)
|
|
artifacts/<model>_<mode>.{pt,xgb.json} (sidecar)
|
|
reports/eval/<model>_<mode>_train.json (history + final metrics)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
|
from training._features import (
|
|
ALL_CHANNELS, PHASES, channel_names, channel_in_deployment_mask,
|
|
in_deployment_mask, feature_names_episode,
|
|
)
|
|
from training._split import (
|
|
held_out_host, held_out_sample, held_out_time, Splits,
|
|
)
|
|
from training.models import get_model
|
|
from training.models._base import StandardizeStats
|
|
from training.models._checkpoint import make_keep_mask, save_checkpoint
|
|
from training.trainer._data import load_summary, load_tensor
|
|
from training.trainer._loop import train_nn, _macro_f1
|
|
|
|
|
|
log = logging.getLogger("cis490.trainer.run")
|
|
|
|
|
|
def _build_split(recipe: str, *, profiles, sample_names, host_ids,
|
|
episode_ids, received_at, train_hosts, seed: int) -> Splits:
|
|
if recipe == "host":
|
|
return held_out_host(
|
|
profiles=profiles, sample_names=sample_names, host_ids=host_ids,
|
|
episode_ids=episode_ids, train_hosts=train_hosts, seed=seed,
|
|
)
|
|
if recipe == "sample":
|
|
return held_out_sample(
|
|
profiles=profiles, sample_names=sample_names, host_ids=host_ids,
|
|
seed=seed,
|
|
)
|
|
if recipe == "time":
|
|
return held_out_time(
|
|
profiles=profiles, sample_names=sample_names, host_ids=host_ids,
|
|
received_at=received_at, seed=seed,
|
|
)
|
|
raise ValueError(f"unknown split recipe: {recipe}")
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--model", required=True,
|
|
help="one of gbt|mlp|cnn|gru|lstm|transformer")
|
|
ap.add_argument("--mode", required=True, choices=["realistic", "oracle"])
|
|
ap.add_argument("--summary", type=Path,
|
|
default=Path("data/processed/features_window_v1.parquet"))
|
|
ap.add_argument("--tensors", type=Path,
|
|
default=Path("data/processed/tensor_window_v1"))
|
|
ap.add_argument("--schema", type=Path,
|
|
default=Path("data/processed/feature_schema_v1.json"))
|
|
ap.add_argument("--validation", type=Path,
|
|
default=Path("data/processed/validation_v1.parquet"))
|
|
ap.add_argument("--split-recipe", choices=["host", "sample", "time"],
|
|
default="host")
|
|
ap.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"])
|
|
ap.add_argument("--seed", type=int, default=0)
|
|
ap.add_argument("--out-dir", type=Path, default=Path("artifacts"))
|
|
ap.add_argument("--reports-dir", type=Path, default=Path("reports/eval"))
|
|
ap.add_argument("--epochs", type=int, default=60)
|
|
ap.add_argument("--batch-size", type=int, default=512)
|
|
ap.add_argument("--lr", type=float, default=1e-3)
|
|
ap.add_argument("--patience", type=int, default=8)
|
|
ap.add_argument("--max-episodes", type=int, default=None,
|
|
help="smoke-test cap on tensor episodes")
|
|
ap.add_argument("--device", default="auto")
|
|
ap.add_argument("--log-level", default="INFO")
|
|
args = ap.parse_args()
|
|
|
|
logging.basicConfig(level=args.log_level,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
|
|
|
args.out_dir.mkdir(parents=True, exist_ok=True)
|
|
args.reports_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
cls = get_model(args.model)
|
|
# Probe input_kind without instantiating
|
|
input_kind = cls.input_kind
|
|
|
|
# Build the split from the validator output (one row per episode).
|
|
import pyarrow.parquet as pq
|
|
val_tbl = pq.read_table(args.validation).to_pylist()
|
|
rows = [r for r in val_tbl if r["status"] in ("accepted", "degraded")]
|
|
profs = [r["profile"] for r in rows]
|
|
samples = [r["sample_name"] for r in rows]
|
|
hosts = [r["host_id"] for r in rows]
|
|
epi_ids = [r["episode_id"] for r in rows]
|
|
recv = [r.get("received_at_wall", "") for r in rows]
|
|
|
|
splits = _build_split(
|
|
args.split_recipe, profiles=profs, sample_names=samples,
|
|
host_ids=hosts, episode_ids=epi_ids, received_at=recv,
|
|
train_hosts=args.train_hosts, seed=args.seed,
|
|
)
|
|
splits.assert_coverage()
|
|
log.info("split:\n%s", splits.summary())
|
|
train_eps = {epi_ids[i] for i in range(len(epi_ids)) if splits.train[i]}
|
|
val_eps = {epi_ids[i] for i in range(len(epi_ids)) if splits.val[i]}
|
|
test_eps = {epi_ids[i] for i in range(len(epi_ids)) if splits.test[i]}
|
|
|
|
# ─── Load data ───────────────────────────────────────────────────
|
|
if input_kind == "summary":
|
|
log.info("loading summary features from %s", args.summary)
|
|
data = load_summary(args.summary, args.schema)
|
|
epi_col = data.episode_id
|
|
X = data.X
|
|
y = data.y
|
|
else:
|
|
log.info("loading tensor shards from %s", args.tensors)
|
|
data = load_tensor(args.tensors, max_episodes=args.max_episodes)
|
|
epi_col = data.episode_id
|
|
X = data.X
|
|
y = data.y
|
|
|
|
# Per-window masks
|
|
train_mask = np.array([e in train_eps for e in epi_col], dtype=bool)
|
|
val_mask = np.array([e in val_eps for e in epi_col], dtype=bool)
|
|
test_mask = np.array([e in test_eps for e in epi_col], dtype=bool)
|
|
log.info("windows: train=%d val=%d test=%d (of %d)",
|
|
int(train_mask.sum()), int(val_mask.sum()),
|
|
int(test_mask.sum()), len(epi_col))
|
|
|
|
# ─── Build keep mask + standardize on train ──────────────────────
|
|
keep_mask = make_keep_mask(input_kind, args.mode)
|
|
log.info("keep_mask: %d / %d active", int(keep_mask.sum()), len(keep_mask))
|
|
|
|
if input_kind == "summary":
|
|
X_keep_train = X[train_mask][:, keep_mask]
|
|
std = StandardizeStats.fit(X_keep_train, axis=0)
|
|
else:
|
|
X_keep_train = X[train_mask][:, keep_mask, :]
|
|
std = StandardizeStats.fit(X_keep_train, axis=(0, 2))
|
|
|
|
# ─── Build model ─────────────────────────────────────────────────
|
|
n_classes = max(int(y.max()) + 1, 5) # at least 5 phases known
|
|
if input_kind == "summary":
|
|
if args.model == "gbt":
|
|
model = cls(n_classes=n_classes, keep_mask=keep_mask, standardize=std)
|
|
elif args.model == "knn":
|
|
model = cls(n_classes=n_classes, keep_mask=keep_mask,
|
|
standardize=std)
|
|
else:
|
|
model = cls(n_features_in=int(keep_mask.sum()), n_classes=n_classes,
|
|
keep_mask=keep_mask, standardize=std,
|
|
device=("cuda" if args.device == "auto" and _cuda_ok()
|
|
else "cpu"))
|
|
else:
|
|
n_t = X.shape[2]
|
|
device = ("cuda" if args.device == "auto" and _cuda_ok()
|
|
else "cpu" if args.device == "auto" else args.device)
|
|
model = cls(n_channels_in=int(keep_mask.sum()), n_timesteps=n_t,
|
|
n_classes=n_classes, keep_mask=keep_mask,
|
|
standardize=std, device=device)
|
|
|
|
# ─── Train ───────────────────────────────────────────────────────
|
|
started = time.monotonic()
|
|
if args.model == "gbt":
|
|
# Sample-weighted (class-weighted) fit via XGBoost weights
|
|
from training.trainer._loop import _compute_class_weights
|
|
cw = _compute_class_weights(y[train_mask], n_classes)
|
|
sample_w = cw[y[train_mask]]
|
|
history = model.fit(
|
|
X_train=X[train_mask], y_train=y[train_mask],
|
|
X_val=X[val_mask], y_val=y[val_mask],
|
|
sample_weight=sample_w,
|
|
n_estimators=1000, early_stopping_rounds=30,
|
|
verbose_eval=50,
|
|
)
|
|
# Compute val macro-F1 at best iteration
|
|
y_val_pred = model.predict(X[val_mask])
|
|
best_f1 = _macro_f1(y[val_mask], y_val_pred, n_classes)
|
|
train_seconds = time.monotonic() - started
|
|
train_meta = {
|
|
"kind": "gbt", "history": history,
|
|
"best_iter": history["best_iter"], "best_val_macro_f1": best_f1,
|
|
"train_seconds": train_seconds,
|
|
}
|
|
config = {"params": history.get("history", {}) and model._params or {}}
|
|
elif args.model == "knn":
|
|
# Non-parametric: model.fit memorizes the train set; "training
|
|
# time" is dominated by the val/test predict calls (KD-tree build).
|
|
history = model.fit(
|
|
X_train=X[train_mask], y_train=y[train_mask],
|
|
X_val=X[val_mask], y_val=y[val_mask],
|
|
)
|
|
best_f1 = float(history.get("val_macro_f1", 0.0))
|
|
train_seconds = time.monotonic() - started
|
|
train_meta = {
|
|
"kind": "knn",
|
|
"best_val_macro_f1": best_f1,
|
|
"train_seconds": train_seconds,
|
|
"history": history,
|
|
}
|
|
config = {"k": model.config["k"], "weights": model.config["weights"]}
|
|
else:
|
|
result = train_nn(
|
|
model=model,
|
|
X_train=X[train_mask], y_train=y[train_mask],
|
|
X_val=X[val_mask], y_val=y[val_mask],
|
|
n_classes=n_classes,
|
|
epochs=args.epochs, batch_size=args.batch_size,
|
|
base_lr=args.lr, patience=args.patience,
|
|
device=("cuda" if args.device == "auto" and _cuda_ok()
|
|
else "cpu" if args.device == "auto" else args.device),
|
|
)
|
|
train_meta = {
|
|
"kind": "nn",
|
|
"best_epoch": result.best_epoch,
|
|
"best_val_macro_f1": result.best_macro_f1,
|
|
"train_seconds": result.train_seconds,
|
|
"history": result.history,
|
|
}
|
|
config = dict(model.config)
|
|
|
|
# ─── PCA-2 for dashboard scatter ─────────────────────────────────
|
|
pca_proj = _fit_pca2(model, X[train_mask], val_mask, X)
|
|
|
|
# ─── Save checkpoint ─────────────────────────────────────────────
|
|
base = args.out_dir / f"{args.model}_{args.mode}"
|
|
json_path = save_checkpoint(
|
|
model, path=base, name=args.model, mode=args.mode,
|
|
config=config,
|
|
train_meta={
|
|
"split_recipe": args.split_recipe,
|
|
"split_config": splits.config,
|
|
"excluded_profiles": list(splits.excluded_profiles),
|
|
"untested_profiles": list(splits.untested_profiles),
|
|
"n_train_windows": int(train_mask.sum()),
|
|
"n_val_windows": int(val_mask.sum()),
|
|
"n_test_windows": int(test_mask.sum()),
|
|
**train_meta,
|
|
},
|
|
pca_proj=pca_proj,
|
|
)
|
|
log.info("saved checkpoint: %s", json_path)
|
|
|
|
# ─── Quick test metrics (full eval is in training/eval_/) ─────────
|
|
y_test_pred = model.predict(X[test_mask])
|
|
test_f1 = _macro_f1(y[test_mask], y_test_pred, n_classes)
|
|
log.info("TEST macro_f1 = %.4f", test_f1)
|
|
metrics = {
|
|
"model": args.model,
|
|
"mode": args.mode,
|
|
"split_recipe": args.split_recipe,
|
|
"val_macro_f1": train_meta.get("best_val_macro_f1"),
|
|
"test_macro_f1": test_f1,
|
|
"n_train_windows": int(train_mask.sum()),
|
|
"n_val_windows": int(val_mask.sum()),
|
|
"n_test_windows": int(test_mask.sum()),
|
|
"untested_profiles": list(splits.untested_profiles),
|
|
"checkpoint": str(json_path),
|
|
"train_seconds": train_meta.get("train_seconds"),
|
|
}
|
|
out_metrics = args.reports_dir / f"{args.model}_{args.mode}_train.json"
|
|
out_metrics.write_text(json.dumps(metrics, indent=2) + "\n")
|
|
print(json.dumps(metrics, indent=2))
|
|
return 0
|
|
|
|
|
|
def _cuda_ok() -> bool:
|
|
try:
|
|
import torch
|
|
return torch.cuda.is_available()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _fit_pca2(model, X_train_full: np.ndarray, val_mask: np.ndarray,
|
|
X_full: np.ndarray) -> np.ndarray | None:
|
|
"""Fit a 2-dim PCA on the model's *standardized, kept* train features.
|
|
|
|
For tensor models, flatten (C, T) → C*T per window before PCA. The
|
|
projection is saved with the checkpoint and used by the dashboard
|
|
scatter widget. Returns shape (D, 2) where D is the post-keep, post-
|
|
flatten dim.
|
|
"""
|
|
try:
|
|
from sklearn.decomposition import PCA
|
|
except Exception:
|
|
return None
|
|
Xk = model.select(X_train_full)
|
|
if Xk.ndim == 3:
|
|
Xk = Xk.reshape(Xk.shape[0], -1)
|
|
if Xk.shape[0] < 3 or Xk.shape[1] < 2:
|
|
return None
|
|
# Subsample for speed if large
|
|
rng = np.random.default_rng(0)
|
|
if Xk.shape[0] > 50_000:
|
|
sel = rng.choice(Xk.shape[0], size=50_000, replace=False)
|
|
Xk = Xk[sel]
|
|
pca = PCA(n_components=2, random_state=0)
|
|
pca.fit(Xk)
|
|
return pca.components_.T.astype(np.float32) # shape (D, 2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|