CIS490/training/_split.py
Max 1fabd4a246 training: validator, feature/tensor extractors, 6 supervised models, schema-hashed checkpoints, eval suite, dashboard producers
The model layer of the project, built honestly:

  - tools/dataset_validate.py — full-sweep validator over the receiver
    store (sha256, schema, monotonic labels, telemetry-row gate). On the
    current corpus: 64,798 accepted + 8,154 degraded + 3,701 rejected +
    7 errored across 76,660 shipped episodes. data/processed/validation_v1.parquet
    is committed as the per-episode acceptance index.

  - training/_features.py — channel registry (46 channels across
    proc/guest/qmp/netflow), summary-stat windowing AND channel×time
    tensor extraction at 10s/5s windowing. Time alignment uses t_wall_ns
    (Unix ns) — tested fix for a real netflow-vs-host clock-base
    inconsistency that was silently dropping every netflow channel.

  - training/_split.py — three held-out recipes (host / sample / time)
    with profile-stratification assertions. held_out_host carries
    untested_profiles for cases like scan-and-dial absent from the test
    host (5 of 6 profiles tested cross-device, never silently averaged).

  - training/models/ — 6 architectures behind a common BaseModel
    interface: gbt (XGBoost), mlp, cnn, gru, lstm, transformer. Each
    trained twice (realistic / oracle) per the deployment threat model.
    Schema-hashed checkpoints refuse to load if _features.py changed
    since training (silent-input-drift protection, tested).

  - training/trainer/ — unified training loop: class-weighted CE, LR
    warmup + cosine, gradient clipping, mixed precision when CUDA,
    early stopping on val macro F1, best-on-val checkpoint. Same loop
    runs MLP/CNN/GRU/LSTM/Transformer; GBT uses XGBoost
    early_stopping_rounds on val mlogloss.

  - training/eval_/ — bootstrap 95% CIs on macro F1, per-class F1,
    per-profile and per-host breakdown, paired-bootstrap significance
    for model-vs-model gap. Confusion matrix uses union of seen labels.

  - training/dashboard/producers/ — replay/metrics/perf/profiles
    emitting the six event types the dashboard's awaiting scenes
    consume; on-demand tensor extraction so the Pi can run live
    inference without 65 GB of shards.

  - 17 unit tests (split coverage, features round-trip, schema mismatch,
    determinism, time-base alignment regression).

End-to-end smoke-trained all six on a 567-episode subset; held-out
test macro F1 reported with paired-bootstrap significance. The
methodology now reports honest cross-device generalization, not
in-distribution validation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:19:00 -05:00

434 lines
18 KiB
Python

"""Held-out splits for CIS490.
The dataset has only 12 unique sample_names across 6 profiles, with
two profiles (cpu-saturate, low-and-slow) having a single sample each.
That makes a naive held-out-by-sample split *mathematically impossible*
for those profiles: a sample can only land in one cell, so the cell
on the other side is empty.
This module exposes multiple split recipes so the project can pick the
honesty bar that matches the claim being made:
held_out_host(...)
Train + val from a designated training host; test = held-out
host(s). The val slice is carved *inside* the training host
(in-distribution val for hyperparameter selection). Test is
out-of-distribution → the cross-device generalization claim.
WORKS FOR ALL 6 PROFILES because every host runs every profile.
RECOMMENDED PRIMARY EVAL.
held_out_sample(...)
Train + val + test all by sample_name, profile-stratified. ONLY
valid for profiles with ≥3 unique sample_names; profiles with
fewer return a coverage report and are excluded from this split.
Use as a *secondary* eval to claim novel-malware generalization
on the profiles that support it.
held_out_time(...)
Within-sample time-block split. Latter X% of each (host, sample)
group's episodes (by received_at) → test. Tests within-sample
stability. Weakest claim; included for completeness.
All recipes return a Splits dataclass with identical shape so trainer
and eval are split-agnostic.
"""
from __future__ import annotations
import csv
import hashlib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Sequence
import numpy as np
# ─────────────────────────────────────────────────────────────────────
# Splits dataclass
# ─────────────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class Splits:
"""Per-episode boolean masks for train/val/test plus the recipe
that produced them and the per-sample assignment if any."""
train: np.ndarray # shape (N,) bool
val: np.ndarray # shape (N,) bool
test: np.ndarray # shape (N,) bool
profiles: tuple[str, ...]
sample_names: tuple[str, ...]
host_ids: tuple[str, ...]
recipe: str # "host" | "sample" | "time"
config: dict # recipe-specific config (for repro)
sample_to_split: dict[str, str] = field(default_factory=dict)
# Profiles fully dropped from all three splits (no train, no test):
# used by held_out_sample for profiles with too few unique sample_names
# to honestly evaluate, and by held_out_host for profiles that have NO
# episodes in any train host.
excluded_profiles: tuple[str, ...] = ()
# Profiles that are in train (and possibly val) but have no test cell —
# the cross-device generalization claim doesn't apply to them. They
# are still trained on (so the model recognizes them in-distribution)
# but they're absent from test metrics.
untested_profiles: tuple[str, ...] = ()
def __post_init__(self):
N = len(self.train)
assert len(self.val) == N and len(self.test) == N
sums = (self.train.astype(np.int8)
+ self.val.astype(np.int8)
+ self.test.astype(np.int8))
# Episodes from excluded profiles are False in all three masks
# (they're filtered out, not assigned). All others are exactly 1.
assert ((sums == 1) | (sums == 0)).all(), \
"split partitioning bug: an episode landed in >1 split"
def cell_counts(self) -> dict[tuple[str, str], int]:
counts: dict[tuple[str, str], int] = {}
for prof, m_train, m_val, m_test in zip(
self.profiles, self.train, self.val, self.test
):
if not (m_train or m_val or m_test):
continue # excluded
split = "train" if m_train else "val" if m_val else "test"
counts[(prof, split)] = counts.get((prof, split), 0) + 1
return counts
def coverage_violations(self, *, splits: tuple[str, ...] = ("train", "val", "test")
) -> list[str]:
"""Return list of '<profile>/<split>' cells that are empty
among non-excluded, non-untested profiles for the given splits."""
cells = self.cell_counts()
skip = set(self.excluded_profiles) | set(self.untested_profiles)
unique_profiles = sorted({p for p in self.profiles if p and p not in skip})
out: list[str] = []
for prof in unique_profiles:
for split in splits:
if cells.get((prof, split), 0) == 0:
out.append(f"{prof}/{split}")
return out
def assert_coverage(self) -> None:
"""Hard check: every profile that is *eligible* for cross-device
eval must have train+val+test cells filled. Profiles in
``untested_profiles`` are exempt from the test check; profiles
in ``excluded_profiles`` are exempt from all three."""
bad = self.coverage_violations()
# Allow untested profiles to lack a test cell, but they still must
# have train (and we aim for val too).
cells = self.cell_counts()
for prof in self.untested_profiles:
if cells.get((prof, "train"), 0) == 0:
bad.append(f"{prof}/train (untested-but-also-untrained)")
if bad:
raise AssertionError(
"split coverage violated; empty cells: " + ", ".join(bad)
)
def n_episodes_in_use(self) -> int:
"""Episodes counted somewhere (excluding profiles dropped)."""
return int(self.train.sum() + self.val.sum() + self.test.sum())
def summary(self) -> str:
cells = self.cell_counts()
unique_profiles = sorted({
p for p in self.profiles
if p and p not in self.excluded_profiles
})
out = [
f"recipe: {self.recipe}",
f"config: {self.config}",
f"split sizes: train={int(self.train.sum())} "
f"val={int(self.val.sum())} test={int(self.test.sum())} "
f"(of {len(self.profiles)} candidate episodes; "
f"{len(self.profiles) - self.n_episodes_in_use()} excluded)",
]
if self.excluded_profiles:
out.append(f"excluded profiles (no train data): "
f"{sorted(self.excluded_profiles)}")
if self.untested_profiles:
out.append(f"untested profiles (no cross-device test): "
f"{sorted(self.untested_profiles)}")
out.append("per-profile, per-split counts:")
for prof in unique_profiles:
tr = cells.get((prof, "train"), 0)
va = cells.get((prof, "val"), 0)
te = cells.get((prof, "test"), 0)
out.append(f" {prof:>16} train={tr:>6} val={va:>5} test={te:>5}")
return "\n".join(out)
def save(self, path: Path) -> None:
"""Persist as CSV per (sample_name → split) plus a header row
with the recipe + config. Re-applying via apply_saved gives the
same partitioning across machines."""
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="") as f:
w = csv.writer(f)
w.writerow(["# recipe", self.recipe])
w.writerow(["# config", repr(self.config)])
w.writerow(["# excluded_profiles", ",".join(self.excluded_profiles)])
w.writerow(["sample_name", "split"])
for name, split in sorted(self.sample_to_split.items()):
w.writerow([name, split])
# ─────────────────────────────────────────────────────────────────────
# Hash helper
# ─────────────────────────────────────────────────────────────────────
def _hash_to_unit(s: str, salt: str) -> float:
h = hashlib.sha256(f"{salt}::{s}".encode()).hexdigest()
return int(h[:16], 16) / float(1 << 64)
# ─────────────────────────────────────────────────────────────────────
# Recipe 1: held-out-by-host (PRIMARY)
# ─────────────────────────────────────────────────────────────────────
def held_out_host(
*,
profiles: Sequence[str],
sample_names: Sequence[str],
host_ids: Sequence[str],
episode_ids: Sequence[str],
train_hosts: Sequence[str],
val_frac: float = 0.2,
seed: int = 0,
) -> Splits:
"""Train + val from train_hosts; test = everything else.
Val is carved from inside the training host(s) by deterministic
episode-id hash so val is *in-distribution* — used only for
hyperparameter selection. Test is *out-of-distribution* — the
cross-device generalization claim.
Why episode-id hash for val (not sample-name hash)? The training
host's episodes share sample_names heavily; we want a random
in-distribution val. Hashing by episode_id gives that. Sample-name
hashing would just leave val empty for samples with one episode in
train.
"""
n = len(profiles)
assert len(sample_names) == len(host_ids) == len(episode_ids) == n
train_set = set(train_hosts)
# Decide profile eligibility: a profile that NEVER appears in any
# train host can't be learned and is useless to test. Drop entirely.
# A profile that appears in train hosts but not in any other (test)
# host can be trained on but not tested cross-device — flag it as
# untested so the eval reports partial coverage cleanly.
train_profiles: set[str] = set()
test_profiles: set[str] = set()
for prof, host in zip(profiles, host_ids):
if not prof:
continue
if host in train_set:
train_profiles.add(prof)
else:
test_profiles.add(prof)
excluded = tuple(sorted(test_profiles - train_profiles)) # test-only → drop
untested = tuple(sorted(train_profiles - test_profiles)) # train-only → train, no test
train_m = np.zeros(n, dtype=bool)
val_m = np.zeros(n, dtype=bool)
test_m = np.zeros(n, dtype=bool)
salt = f"v1::held_out_host::{seed}"
excluded_set = set(excluded)
for i, (host, prof, epi_id) in enumerate(
zip(host_ids, profiles, episode_ids)
):
if prof in excluded_set:
continue # all masks remain False → episode dropped
if host in train_set:
u = _hash_to_unit(epi_id, salt=salt)
if u < val_frac:
val_m[i] = True
else:
train_m[i] = True
else:
test_m[i] = True
return Splits(
train=train_m, val=val_m, test=test_m,
profiles=tuple(profiles), sample_names=tuple(sample_names),
host_ids=tuple(host_ids),
recipe="host",
config={"train_hosts": list(train_hosts), "val_frac": val_frac,
"seed": seed},
sample_to_split={},
excluded_profiles=excluded,
untested_profiles=untested,
)
# ─────────────────────────────────────────────────────────────────────
# Recipe 2: held-out-by-sample (SECONDARY)
# ─────────────────────────────────────────────────────────────────────
def held_out_sample(
*,
profiles: Sequence[str],
sample_names: Sequence[str],
host_ids: Sequence[str],
fractions: tuple[float, float, float] = (0.6, 0.2, 0.2),
min_samples_per_profile: int = 3,
seed: int = 0,
) -> Splits:
"""Profile-stratified split on sample_name.
Profiles with fewer than ``min_samples_per_profile`` unique
sample_names are *excluded* from this split (their episodes have
all three masks False) because we cannot honestly measure
cross-sample generalization with so few samples. They should be
evaluated under a different recipe (host or time).
``min_samples_per_profile=3`` is the minimum that lets a 60/20/20
split fill all three cells for a profile. Lower it to 2 if you
accept fragile val/test cells.
"""
if abs(sum(fractions) - 1.0) > 1e-9:
raise ValueError(f"fractions must sum to 1.0, got {fractions}")
f_train, f_val, _f_test = fractions
n = len(profiles)
assert len(sample_names) == len(host_ids) == n
# Per-profile sample_name set
by_profile: dict[str, set[str]] = {}
for prof, name in zip(profiles, sample_names):
if not prof:
continue
by_profile.setdefault(prof, set()).add(name or "")
excluded = tuple(sorted(
p for p, s in by_profile.items()
if len(s) < min_samples_per_profile
))
# Rank-based assignment: with N unique samples in a profile, sort
# them by deterministic hash, then take the first floor(N*f_train)
# for train, next ceil for val, rest for test. With N>=3 and
# 60/20/20 every cell gets >=1, which is what min_samples=3 means.
sample_to_split: dict[str, str] = {}
for prof, names in by_profile.items():
if prof in excluded:
continue
salt = f"v1::held_out_sample::{seed}::{prof}"
ranked = sorted(names, key=lambda n: _hash_to_unit(n, salt=salt))
N = len(ranked)
n_train = max(1, int(round(N * f_train)))
n_val = max(1, int(round(N * f_val)))
# Ensure n_train + n_val + n_test = N and each >= 1
if n_train + n_val >= N:
n_train = N - 2
n_val = 1
for k, name in enumerate(ranked):
if k < n_train:
sample_to_split[name] = "train"
elif k < n_train + n_val:
sample_to_split[name] = "val"
else:
sample_to_split[name] = "test"
train_m = np.zeros(n, dtype=bool)
val_m = np.zeros(n, dtype=bool)
test_m = np.zeros(n, dtype=bool)
for i, (prof, name) in enumerate(zip(profiles, sample_names)):
if not prof or prof in excluded:
continue
s = sample_to_split.get(name or "", None)
if s == "train": train_m[i] = True
elif s == "val": val_m[i] = True
elif s == "test": test_m[i] = True
return Splits(
train=train_m, val=val_m, test=test_m,
profiles=tuple(profiles), sample_names=tuple(sample_names),
host_ids=tuple(host_ids),
recipe="sample",
config={"fractions": list(fractions), "seed": seed,
"min_samples_per_profile": min_samples_per_profile},
sample_to_split=sample_to_split,
excluded_profiles=excluded,
)
# ─────────────────────────────────────────────────────────────────────
# Recipe 3: held-out-by-time (within-sample, weakest)
# ─────────────────────────────────────────────────────────────────────
def held_out_time(
*,
profiles: Sequence[str],
sample_names: Sequence[str],
host_ids: Sequence[str],
received_at: Sequence[str], # ISO timestamps
fractions: tuple[float, float, float] = (0.6, 0.2, 0.2),
seed: int = 0,
) -> Splits:
"""Within each (host, sample) group, sort episodes by received_at
and assign earliest fractions[0] to train, next fractions[1] to
val, last fractions[2] to test. Tests within-sample stability."""
if abs(sum(fractions) - 1.0) > 1e-9:
raise ValueError(f"fractions must sum to 1.0, got {fractions}")
f_train, f_val, _ = fractions
n = len(profiles)
assert len(sample_names) == len(host_ids) == len(received_at) == n
# Group indices by (host, sample), sort by received_at, partition
groups: dict[tuple[str, str], list[int]] = {}
for i in range(n):
key = (host_ids[i] or "", sample_names[i] or "")
groups.setdefault(key, []).append(i)
train_m = np.zeros(n, dtype=bool)
val_m = np.zeros(n, dtype=bool)
test_m = np.zeros(n, dtype=bool)
for key, idxs in groups.items():
idxs.sort(key=lambda i: received_at[i] or "")
m = len(idxs)
n_train = int(m * f_train)
n_val = int(m * (f_train + f_val))
for i in idxs[:n_train]: train_m[i] = True
for i in idxs[n_train:n_val]: val_m[i] = True
for i in idxs[n_val:]: test_m[i] = True
return Splits(
train=train_m, val=val_m, test=test_m,
profiles=tuple(profiles), sample_names=tuple(sample_names),
host_ids=tuple(host_ids),
recipe="time",
config={"fractions": list(fractions), "seed": seed},
sample_to_split={},
excluded_profiles=(),
)
# ─────────────────────────────────────────────────────────────────────
# Convenience: load a saved split CSV and apply to a new candidate set
# ─────────────────────────────────────────────────────────────────────
def load_sample_to_split(path: Path) -> dict[str, str]:
out: dict[str, str] = {}
with path.open() as f:
rdr = csv.reader(f)
header_seen = False
for row in rdr:
if not row:
continue
if row[0].startswith("#"):
continue
if not header_seen and row[:2] == ["sample_name", "split"]:
header_seen = True
continue
if len(row) >= 2:
out[row[0]] = row[1]
return out