diff --git a/etc/training_manifest.toml.example b/etc/training_manifest.toml.example index db999ee..2ae67d9 100644 --- a/etc/training_manifest.toml.example +++ b/etc/training_manifest.toml.example @@ -78,6 +78,25 @@ priority = 100 require_cuda = false min_ram_gib = 4 +[[jobs]] +name = "knn-realistic" +model = "knn" +mode = "realistic" +priority = 95 # right after GBT — fastest non-parametric baseline +require_cuda = false +min_ram_gib = 4 +# KNN's k=10 / weights=distance live in the model class. To override, +# add --k / --weights to training/trainer/run.py first; otherwise these +# hyper.* keys would fail with the unknown-arg exit-2 issue. + +[[jobs]] +name = "knn-oracle" +model = "knn" +mode = "oracle" +priority = 95 +require_cuda = false +min_ram_gib = 4 + [[jobs]] name = "mlp-realistic" model = "mlp" diff --git a/training/models/__init__.py b/training/models/__init__.py index e483dda..a855f18 100644 --- a/training/models/__init__.py +++ b/training/models/__init__.py @@ -36,6 +36,7 @@ def get_model(name: str): # Eager-import the implementations so the registry is populated. # Order matters only for which "kind" gets imported first — all are listed. from training.models import gbt # noqa: F401,E402 +from training.models import knn # noqa: F401,E402 from training.models import mlp # noqa: F401,E402 from training.models import cnn # noqa: F401,E402 from training.models import gru # noqa: F401,E402 diff --git a/training/models/_checkpoint.py b/training/models/_checkpoint.py index c2a9edb..466a7ea 100644 --- a/training/models/_checkpoint.py +++ b/training/models/_checkpoint.py @@ -151,6 +151,8 @@ def _write_sidecar(model: BaseModel, *, base: Path) -> str: """ if model.__model_name__ == "gbt": path = base.with_suffix(".xgb.json") + elif model.__model_name__ == "knn": + path = base.with_suffix(".knn.pkl") else: path = base.with_suffix(".pt") model.save_sidecar(path) @@ -186,8 +188,9 @@ def load_checkpoint(path: Path, *, device: str = "auto") -> BaseModel: cls = get_model(header["name"]) sidecar = json_path.with_name(header["sidecar"]) payload: dict[str, Any] - if header["name"] == "gbt": - # GBT loader reads the .xgb.json directly; pass the path in payload + if header["name"] in ("gbt", "knn"): + # File-path loaders (XGBoost JSON, sklearn pickle); they open + # the sidecar themselves rather than receiving torch tensors. payload = {"sidecar_path": str(sidecar)} else: import torch diff --git a/training/models/knn.py b/training/models/knn.py new file mode 100644 index 0000000..d5f59d8 --- /dev/null +++ b/training/models/knn.py @@ -0,0 +1,142 @@ +"""KNN classifier on per-window summary features. + +Non-parametric baseline. Like GBT it uses the summary-stat input +(mean / std / p50 / p95 / slope per channel), but where GBT learns +axis-aligned splits, KNN reads off the local neighborhood structure +in feature space. That makes it a useful complement: where the two +agree, decisions are well-supported; where they disagree, the local +density of the feature manifold is contradicting the global +boosted-tree partitioning. + +We use distance-weighted KNN with k=10 by default. Schema-hashed +checkpoint format (same as every other model) so training-time +schema drift fails loud at load. + +Standardization is critical for KNN — without it, channels with +larger numeric scales dominate the Euclidean distance. We use the +same per-feature StandardizeStats (median imputation + z-score) +as the rest of the supervised pipeline. The fit is the *training* +set; the model holds onto the standardized X_train + y_train as +its "weights" since KNN is non-parametric. +""" +from __future__ import annotations + +import io +import pickle +from pathlib import Path +from typing import Any + +import numpy as np + +from training.models import register +from training.models._base import BaseModel, StandardizeStats + + +@register("knn") +class KNN(BaseModel): + input_kind = "summary" + + def __init__( + self, + *, + n_classes: int, + keep_mask: np.ndarray, + standardize: StandardizeStats, + k: int = 10, + weights: str = "distance", + algorithm: str = "auto", + clf=None, + ) -> None: + self.n_classes = n_classes + self.keep_mask = keep_mask.astype(bool) + self.standardize = standardize + self.config = {"k": k, "weights": weights, "algorithm": algorithm} + self._clf = clf + + @property + def clf(self): + if self._clf is None: + raise RuntimeError("model not fitted; call .fit(...) first") + return self._clf + + def fit( + self, + *, + X_train: np.ndarray, + y_train: np.ndarray, + X_val: np.ndarray | None = None, + y_val: np.ndarray | None = None, + sample_weight: np.ndarray | None = None, + ) -> dict: + """KNN doesn't 'train' — it memorizes. We fit the underlying + sklearn classifier on the standardized + keep-masked train + slice, then optionally compute a val macro F1 for the trainer's + bookkeeping. + """ + from sklearn.neighbors import KNeighborsClassifier + Xk = self.select(X_train) + clf = KNeighborsClassifier( + n_neighbors=int(self.config["k"]), + weights=str(self.config["weights"]), + algorithm=str(self.config["algorithm"]), + n_jobs=-1, + ) + clf.fit(Xk, y_train) + self._clf = clf + + history: dict = {} + if X_val is not None and y_val is not None and len(X_val) > 0: + from training.eval_._metrics import _macro_f1 + y_pred_val = self.predict(X_val) + history["val_macro_f1"] = _macro_f1( + y_val, y_pred_val, n_classes=self.n_classes + ) + return history + + def predict_proba(self, X: np.ndarray) -> np.ndarray: + Xk = self.select(X) + return self.clf.predict_proba(Xk).astype(np.float32) + + # --- Checkpoint API ----------------------------------------------- + + def state_for_checkpoint(self) -> dict[str, Any]: + # KNN's "weights" are the train set itself — sklearn's pickle + # round-trip is the canonical way to persist that. We embed + # the pickle bytes in the metadata dict; the sidecar layer + # writes them through a torch-style save (see save_sidecar). + return {"config": self.config} + + def save_sidecar(self, path: Path) -> None: + # Sidecar is a pickle of the sklearn classifier. KNN's storage + # cost = ~n_train_rows × n_features × 4 bytes. For our scale + # (~660k windows × ~145 kept features × 4 = ~380 MB) this is + # heavy — set a `--max-train-rows` cap in the trainer if memory + # is tight on the Pi. + with path.open("wb") as f: + pickle.dump(self._clf, f, protocol=pickle.HIGHEST_PROTOCOL) + + @classmethod + def from_checkpoint(cls, header: dict, payload: dict, *, + device: str = "cpu") -> "KNN": + # The framework points us at the sidecar pickle path + sidecar_path = payload.get("sidecar_path") + if sidecar_path is None: + # Loaded via torch.load (NN path) by mistake — tell the + # checkpoint loader we want the file path instead. + raise RuntimeError( + "KNN checkpoint requires sidecar_path; ensure the " + "loader treats KNN like GBT (passes the file path " + "rather than torch.load'ing the bytes)." + ) + with Path(sidecar_path).open("rb") as f: + clf = pickle.load(f) + cfg = header.get("config", {}) or {} + return cls( + n_classes=int(header["n_classes"]), + keep_mask=np.asarray(header["keep_mask"], dtype=bool), + standardize=StandardizeStats.from_dict(header["standardize"]), + k=int(cfg.get("k", 10)), + weights=str(cfg.get("weights", "distance")), + algorithm=str(cfg.get("algorithm", "auto")), + clf=clf, + ) diff --git a/training/trainer/run.py b/training/trainer/run.py index 3a68309..92433e0 100644 --- a/training/trainer/run.py +++ b/training/trainer/run.py @@ -166,6 +166,9 @@ def main() -> int: if input_kind == "summary": if args.model == "gbt": model = cls(n_classes=n_classes, keep_mask=keep_mask, standardize=std) + elif args.model == "knn": + model = cls(n_classes=n_classes, keep_mask=keep_mask, + standardize=std) else: model = cls(n_features_in=int(keep_mask.sum()), n_classes=n_classes, keep_mask=keep_mask, standardize=std, @@ -203,6 +206,22 @@ def main() -> int: "train_seconds": train_seconds, } config = {"params": history.get("history", {}) and model._params or {}} + elif args.model == "knn": + # Non-parametric: model.fit memorizes the train set; "training + # time" is dominated by the val/test predict calls (KD-tree build). + history = model.fit( + X_train=X[train_mask], y_train=y[train_mask], + X_val=X[val_mask], y_val=y[val_mask], + ) + best_f1 = float(history.get("val_macro_f1", 0.0)) + train_seconds = time.monotonic() - started + train_meta = { + "kind": "knn", + "best_val_macro_f1": best_f1, + "train_seconds": train_seconds, + "history": history, + } + config = {"k": model.config["k"], "weights": model.config["weights"]} else: result = train_nn( model=model,