CIS490/training/eval_/breakdown.py
Max 1fabd4a246 training: validator, feature/tensor extractors, 6 supervised models, schema-hashed checkpoints, eval suite, dashboard producers
The model layer of the project, built honestly:

  - tools/dataset_validate.py — full-sweep validator over the receiver
    store (sha256, schema, monotonic labels, telemetry-row gate). On the
    current corpus: 64,798 accepted + 8,154 degraded + 3,701 rejected +
    7 errored across 76,660 shipped episodes. data/processed/validation_v1.parquet
    is committed as the per-episode acceptance index.

  - training/_features.py — channel registry (46 channels across
    proc/guest/qmp/netflow), summary-stat windowing AND channel×time
    tensor extraction at 10s/5s windowing. Time alignment uses t_wall_ns
    (Unix ns) — tested fix for a real netflow-vs-host clock-base
    inconsistency that was silently dropping every netflow channel.

  - training/_split.py — three held-out recipes (host / sample / time)
    with profile-stratification assertions. held_out_host carries
    untested_profiles for cases like scan-and-dial absent from the test
    host (5 of 6 profiles tested cross-device, never silently averaged).

  - training/models/ — 6 architectures behind a common BaseModel
    interface: gbt (XGBoost), mlp, cnn, gru, lstm, transformer. Each
    trained twice (realistic / oracle) per the deployment threat model.
    Schema-hashed checkpoints refuse to load if _features.py changed
    since training (silent-input-drift protection, tested).

  - training/trainer/ — unified training loop: class-weighted CE, LR
    warmup + cosine, gradient clipping, mixed precision when CUDA,
    early stopping on val macro F1, best-on-val checkpoint. Same loop
    runs MLP/CNN/GRU/LSTM/Transformer; GBT uses XGBoost
    early_stopping_rounds on val mlogloss.

  - training/eval_/ — bootstrap 95% CIs on macro F1, per-class F1,
    per-profile and per-host breakdown, paired-bootstrap significance
    for model-vs-model gap. Confusion matrix uses union of seen labels.

  - training/dashboard/producers/ — replay/metrics/perf/profiles
    emitting the six event types the dashboard's awaiting scenes
    consume; on-demand tensor extraction so the Pi can run live
    inference without 65 GB of shards.

  - 17 unit tests (split coverage, features round-trip, schema mismatch,
    determinism, time-base alignment regression).

End-to-end smoke-trained all six on a 567-episode subset; held-out
test macro F1 reported with paired-bootstrap significance. The
methodology now reports honest cross-device generalization, not
in-distribution validation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:19:00 -05:00

70 lines
2.1 KiB
Python

"""Per-profile and per-host metric breakdown.
A model with macro F1 = 0.55 might be 0.85 on five profiles and 0.10
on the sixth. The single number hides exactly the kind of failure mode
this project cares about (one malware family the model can't see).
This module produces the breakdown table.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
import numpy as np
from training.eval_._metrics import _f1, _macro_f1, bootstrap_macro_f1
@dataclass
class CellMetrics:
n: int
macro_f1: float
macro_f1_lo: float
macro_f1_hi: float
per_class_f1: dict[int, float]
def by_profile(
*,
y_true: np.ndarray, y_pred: np.ndarray,
profiles: list[str], n_classes: int,
n_resamples: int = 500,
) -> dict[str, CellMetrics]:
"""One row per profile observed in test."""
out: dict[str, CellMetrics] = {}
profs = np.asarray(profiles)
for prof in sorted({p for p in profs if p}):
m = profs == prof
if not m.any():
continue
ci = bootstrap_macro_f1(y_true[m], y_pred[m], n_classes,
n_resamples=n_resamples)
per_class = {k: _f1(y_true[m], y_pred[m], k) for k in range(n_classes)}
out[prof] = CellMetrics(
n=int(m.sum()), macro_f1=ci.point,
macro_f1_lo=ci.low, macro_f1_hi=ci.high,
per_class_f1=per_class,
)
return out
def by_host(
*,
y_true: np.ndarray, y_pred: np.ndarray,
hosts: list[str], n_classes: int,
n_resamples: int = 500,
) -> dict[str, CellMetrics]:
out: dict[str, CellMetrics] = {}
hs = np.asarray(hosts)
for h in sorted({x for x in hs if x}):
m = hs == h
if not m.any():
continue
ci = bootstrap_macro_f1(y_true[m], y_pred[m], n_classes,
n_resamples=n_resamples)
per_class = {k: _f1(y_true[m], y_pred[m], k) for k in range(n_classes)}
out[h] = CellMetrics(
n=int(m.sum()), macro_f1=ci.point,
macro_f1_lo=ci.low, macro_f1_hi=ci.high,
per_class_f1=per_class,
)
return out