CIS490/training/eval_/run.py
Max 697e36a315 training/producers: move out of dashboard/ per ownership boundary
Producers are event *sources* — the renderer is everything inside
training/dashboard/. Sibling layout makes the dependency direction
one-way (producers import from training.dashboard.events; dashboard
never reaches into producers).

  training/dashboard/producers/   →   training/producers/

Internal imports rewritten via sed; eval_/run.py and training/README.md
cross-references updated. CLI entry stays via `python -m training.producers.<sub>`
(replay / metrics / perf / profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:06:56 -05:00

249 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""End-to-end eval driver — load all checkpoints, score on test split,
emit per-model JSON + a comparison markdown.
Outputs to reports/eval/:
<model>_<mode>_eval.json full metrics: macro_f1 ± CI, per-phase F1 ± CI,
per-profile F1, per-host F1, confusion matrix
comparison_v2.md side-by-side table with paired-bootstrap
significance
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from dataclasses import asdict
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from training._features import PHASES
from training._split import (
held_out_host, held_out_sample, held_out_time,
)
from training.producers._models import load_models
from training.eval_._metrics import (
bootstrap_macro_f1, bootstrap_per_class_f1,
confusion_matrix, paired_bootstrap_macro_f1_diff,
per_class_pr_f1,
)
from training.eval_.breakdown import by_host, by_profile
log = logging.getLogger("cis490.eval.run")
def _load_test(model, *, validation_path: Path,
summary_path: Path | None, tensors_root: Path | None,
split_recipe: str, train_hosts: list[str], seed: int = 0
) -> dict:
val = pq.read_table(validation_path).to_pylist()
rows = [r for r in val if r["status"] in ("accepted", "degraded")]
profs = [r["profile"] for r in rows]
samples = [r["sample_name"] for r in rows]
hosts = [r["host_id"] for r in rows]
epi_ids = [r["episode_id"] for r in rows]
recv = [r.get("received_at_wall", "") for r in rows]
if split_recipe == "host":
s = held_out_host(profiles=profs, sample_names=samples,
host_ids=hosts, episode_ids=epi_ids,
train_hosts=train_hosts, seed=seed)
elif split_recipe == "sample":
s = held_out_sample(profiles=profs, sample_names=samples,
host_ids=hosts, seed=seed)
else:
s = held_out_time(profiles=profs, sample_names=samples,
host_ids=hosts, received_at=recv, seed=seed)
test_eps = {epi_ids[i] for i in range(len(epi_ids)) if s.test[i]}
if model.input_kind == "summary":
from training.trainer._data import load_summary
schema = (summary_path.parent / "feature_schema_v1.json")
d = load_summary(summary_path, schema)
else:
from training.trainer._data import load_tensor
d = load_tensor(tensors_root)
m = np.array([e in test_eps for e in d.episode_id], dtype=bool)
X = d.X[m]
y = d.y[m]
profiles = [d.profile[i] for i in range(len(d.profile)) if m[i]]
hosts_w = [d.host_id[i] for i in range(len(d.host_id)) if m[i]]
return {"X": X, "y": y, "profiles": profiles, "hosts": hosts_w,
"splits": s}
def _eval_one(model, *, validation_path, summary_path, tensors_root,
split_recipe, train_hosts, n_resamples=1000) -> dict:
test = _load_test(model, validation_path=validation_path,
summary_path=summary_path, tensors_root=tensors_root,
split_recipe=split_recipe, train_hosts=train_hosts)
y_true = test["y"]
y_pred = model.predict(test["X"])
nc = model.n_classes
overall = bootstrap_macro_f1(y_true, y_pred, nc, n_resamples=n_resamples)
per_class_ci = bootstrap_per_class_f1(y_true, y_pred, nc,
n_resamples=n_resamples)
by_prof = by_profile(y_true=y_true, y_pred=y_pred,
profiles=test["profiles"], n_classes=nc,
n_resamples=max(200, n_resamples // 2))
by_h = by_host(y_true=y_true, y_pred=y_pred, hosts=test["hosts"],
n_classes=nc, n_resamples=max(200, n_resamples // 2))
cm = confusion_matrix(y_true, y_pred, nc)
return {
"model": model.__model_name__,
"n_test": int(len(y_true)),
"macro_f1": {"point": overall.point,
"low": overall.low, "high": overall.high},
"per_class_f1": {
PHASES[k]: {"point": per_class_ci[k].point,
"low": per_class_ci[k].low,
"high": per_class_ci[k].high}
for k in range(nc)
},
"by_profile": {k: asdict(v) for k, v in by_prof.items()},
"by_host": {k: asdict(v) for k, v in by_h.items()},
"confusion_matrix": cm.tolist(),
"split_recipe": split_recipe,
"untested_profiles": list(test["splits"].untested_profiles),
"excluded_profiles": list(test["splits"].excluded_profiles),
"predictions": y_pred.tolist(), # for paired bootstrap later
"targets": y_true.tolist(),
}
def _markdown_report(results: list[dict], out_path: Path,
*, n_classes: int, n_resamples: int = 1000) -> None:
"""Comparison table + paired-bootstrap significance for the top model."""
lines = ["# Model comparison\n"]
lines.append(f"Held-out recipe: **{results[0]['split_recipe']}**. "
f"All metrics are macro F1 with bootstrap 95 % CIs.\n")
if results[0]["untested_profiles"]:
lines.append(f"⚠ untested profiles (no test cell): "
f"{results[0]['untested_profiles']}\n")
if results[0]["excluded_profiles"]:
lines.append(f"⚠ excluded profiles (no train data): "
f"{results[0]['excluded_profiles']}\n")
lines.append("## Overall macro F1\n")
lines.append("| model | n_test | macro F1 (95 % CI) |")
lines.append("|---|---:|---|")
sorted_r = sorted(results, key=lambda r: -r["macro_f1"]["point"])
for r in sorted_r:
f = r["macro_f1"]
lines.append(f"| {r['model']} | {r['n_test']} | "
f"{f['point']:.3f} [{f['low']:.3f}, {f['high']:.3f}] |")
lines.append("\n## Per-phase F1\n")
# Use the intersection of phases each model reports; PHASES has
# "failed" which models trained on the smoke set may not have seen.
phases = sorted({
p for r in sorted_r for p in r["per_class_f1"].keys()
}, key=lambda p: PHASES.index(p) if p in PHASES else 99)
head = "| model | " + " | ".join(phases) + " |"
lines.append(head); lines.append("|---|" + "---:|" * len(phases))
for r in sorted_r:
cells = [
(f"{r['per_class_f1'][p]['point']:.3f}"
if p in r["per_class_f1"] else "")
for p in phases
]
lines.append(f"| {r['model']} | " + " | ".join(cells) + " |")
lines.append("\n## Per-profile macro F1 (top model only — full table in JSON)\n")
top = sorted_r[0]
lines.append(f"Top model: **{top['model']}**\n")
lines.append("| profile | n | macro F1 (95 % CI) |")
lines.append("|---|---:|---|")
for prof, m in sorted(top["by_profile"].items()):
lines.append(f"| {prof} | {m['n']} | "
f"{m['macro_f1']:.3f} [{m['macro_f1_lo']:.3f}, "
f"{m['macro_f1_hi']:.3f}] |")
lines.append("\n## Per-host macro F1 (top model)\n")
lines.append("| host | n | macro F1 (95 % CI) |")
lines.append("|---|---:|---|")
for h, m in sorted(top["by_host"].items()):
lines.append(f"| {h} | {m['n']} | "
f"{m['macro_f1']:.3f} [{m['macro_f1_lo']:.3f}, "
f"{m['macro_f1_hi']:.3f}] |")
# Paired-bootstrap significance: top vs each other
if len(sorted_r) > 1:
lines.append("\n## Paired-bootstrap significance vs top model\n")
lines.append(f"Comparison anchor: **{top['model']}**. "
f"95 % CI excludes 0 → significant difference.\n")
lines.append("| model | Δ macro F1 (anchor model) (95 % CI) |")
lines.append("|---|---|")
y_true = np.asarray(top["targets"])
y_anchor = np.asarray(top["predictions"])
for r in sorted_r[1:]:
y_other = np.asarray(r["predictions"])
if len(y_other) != len(y_true):
continue
d = paired_bootstrap_macro_f1_diff(
y_true, y_anchor, y_other, n_classes,
n_resamples=n_resamples,
)
sig = "*" if (d.low > 0 or d.high < 0) else ""
lines.append(f"| {r['model']} | "
f"{d.point:+.3f} [{d.low:+.3f}, {d.high:+.3f}] {sig} |")
out_path.write_text("\n".join(lines) + "\n")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--validation", required=True, type=Path)
ap.add_argument("--artifacts", type=Path, default=Path("artifacts"))
ap.add_argument("--summary", type=Path, default=None)
ap.add_argument("--tensors", type=Path, default=None)
ap.add_argument("--reports-dir", type=Path, default=Path("reports/eval"))
ap.add_argument("--split-recipe", choices=["host", "sample", "time"],
default="host")
ap.add_argument("--train-hosts", nargs="+", default=["elliott-thinkpad"])
ap.add_argument("--n-resamples", type=int, default=1000)
args = ap.parse_args()
logging.basicConfig(level="INFO",
format="%(asctime)s %(levelname)s %(name)s %(message)s")
args.reports_dir.mkdir(parents=True, exist_ok=True)
models = load_models(args.artifacts)
if not models:
log.warning("no models found under %s", args.artifacts)
return 1
results = []
for m in models:
log.info("evaluating %s", m.__model_name__)
res = _eval_one(m, validation_path=args.validation,
summary_path=args.summary, tensors_root=args.tensors,
split_recipe=args.split_recipe,
train_hosts=args.train_hosts,
n_resamples=args.n_resamples)
out = args.reports_dir / f"{m.__model_name__}_eval.json"
out.write_text(json.dumps(
{k: v for k, v in res.items()
if k not in {"predictions", "targets"}},
indent=2) + "\n")
results.append(res)
if results:
n_classes = max(r.get("n_test_classes",
len(PHASES)) for r in results)
n_classes = len(PHASES)
_markdown_report(
results, args.reports_dir / "comparison_v2.md",
n_classes=n_classes, n_resamples=args.n_resamples,
)
log.info("wrote %s", args.reports_dir / "comparison_v2.md")
return 0
if __name__ == "__main__":
raise SystemExit(main())