Validator's allowed-models frozenset was missing knn and knn_semi
even though the manifest gained those jobs and the model registry
registered the classes. Lambda bootstrap blocked at:
TrainingManifestError: job 'knn-realistic': model 'knn' not in
['cnn', 'gbt', 'gru', 'lstm', 'mlp', 'transformer', 'transformer_ssl']
Now {gbt, knn, knn_semi, mlp, cnn, gru, lstm, transformer, transformer_ssl}.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
233 lines
8 KiB
Python
233 lines
8 KiB
Python
"""Loader + validator for ``training_manifest.toml``.
|
|
|
|
Every job in the manifest is hashed into a stable ``job_id`` based on
|
|
``(model, mode, hyper-blob, schema_version)`` so the same manifest entry
|
|
always maps to the same queue row across reload/restart. This makes
|
|
``systemctl reload cis490-receiver`` idempotent: jobs already complete
|
|
stay complete; new jobs become claimable; deleted jobs are not removed
|
|
(operator marks them cancelled explicitly).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import tomllib
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
CANONICAL_FILENAMES = (
|
|
"/etc/cis490/training_manifest.toml",
|
|
"training_manifest.toml",
|
|
)
|
|
|
|
|
|
class TrainingManifestError(ValueError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HostSpec:
|
|
name: str
|
|
description: str = ""
|
|
priority: int = 0
|
|
allow_jobs: tuple[str, ...] = ()
|
|
deny_jobs: tuple[str, ...] = ()
|
|
|
|
def is_model_allowed(self, model: str) -> bool:
|
|
if model in self.deny_jobs:
|
|
return False
|
|
if self.allow_jobs and model not in self.allow_jobs:
|
|
return False
|
|
return True
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class JobSpec:
|
|
name: str
|
|
model: str
|
|
mode: str
|
|
priority: int = 0
|
|
require_cuda: bool = False
|
|
prefer_cuda: bool = False
|
|
min_vram_gib: float = 0.0
|
|
min_ram_gib: float = 4.0
|
|
min_cores: int = 1
|
|
allowed_hosts: tuple[str, ...] = () # if non-empty, only these hosts
|
|
denied_hosts: tuple[str, ...] = ()
|
|
hyper: dict[str, Any] = field(default_factory=dict)
|
|
split_recipe: str = "host"
|
|
train_hosts: tuple[str, ...] = ("elliott-thinkpad",)
|
|
seed: int = 0
|
|
n_resamples: int = 1000
|
|
|
|
@property
|
|
def job_id(self) -> str:
|
|
"""Stable hash over all the fields that define what the job IS.
|
|
|
|
Excludes priority + cuda preferences (those are scheduling-only
|
|
and shouldn't change the identity of a completed artifact)."""
|
|
payload = {
|
|
"model": self.model, "mode": self.mode,
|
|
"hyper": self.hyper,
|
|
"split_recipe": self.split_recipe,
|
|
"train_hosts": list(self.train_hosts),
|
|
"seed": self.seed,
|
|
}
|
|
blob = json.dumps(payload, sort_keys=True).encode()
|
|
return hashlib.sha256(blob).hexdigest()[:16]
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"name": self.name,
|
|
"job_id": self.job_id,
|
|
"model": self.model, "mode": self.mode,
|
|
"priority": self.priority,
|
|
"require_cuda": self.require_cuda,
|
|
"prefer_cuda": self.prefer_cuda,
|
|
"min_vram_gib": self.min_vram_gib,
|
|
"min_ram_gib": self.min_ram_gib,
|
|
"min_cores": self.min_cores,
|
|
"allowed_hosts": list(self.allowed_hosts),
|
|
"denied_hosts": list(self.denied_hosts),
|
|
"hyper": dict(self.hyper),
|
|
"split_recipe": self.split_recipe,
|
|
"train_hosts": list(self.train_hosts),
|
|
"seed": self.seed,
|
|
"n_resamples": self.n_resamples,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TrainingManifest:
|
|
schema_version: int
|
|
name: str
|
|
defaults: dict[str, Any]
|
|
hosts: dict[str, HostSpec]
|
|
jobs: tuple[JobSpec, ...]
|
|
|
|
|
|
# Allowed model names — keep in sync with training/models/REGISTRY
|
|
_ALLOWED_MODELS = frozenset({
|
|
"gbt", "knn", "knn_semi",
|
|
"mlp", "cnn", "gru", "lstm", "transformer", "transformer_ssl",
|
|
})
|
|
_ALLOWED_MODES = frozenset({"realistic", "oracle"})
|
|
_ALLOWED_RECIPES = frozenset({"host", "sample", "time"})
|
|
|
|
|
|
def load(path: Path) -> TrainingManifest:
|
|
if not path.exists():
|
|
raise TrainingManifestError(f"manifest not found at {path}")
|
|
try:
|
|
raw = tomllib.loads(path.read_text())
|
|
except tomllib.TOMLDecodeError as e:
|
|
raise TrainingManifestError(f"invalid TOML at {path}: {e}") from e
|
|
|
|
sv = raw.get("schema_version")
|
|
if sv != 1:
|
|
raise TrainingManifestError(
|
|
f"schema_version must be 1, got {sv}"
|
|
)
|
|
|
|
defaults = raw.get("defaults", {}) or {}
|
|
hosts_raw = raw.get("hosts", {}) or {}
|
|
jobs_raw = raw.get("jobs", []) or []
|
|
if not jobs_raw:
|
|
raise TrainingManifestError("manifest has no [[jobs]] entries")
|
|
|
|
hosts: dict[str, HostSpec] = {}
|
|
for hname, h in hosts_raw.items():
|
|
if not isinstance(h, dict):
|
|
raise TrainingManifestError(
|
|
f"hosts.{hname} must be a table"
|
|
)
|
|
hosts[hname] = HostSpec(
|
|
name=hname,
|
|
description=str(h.get("description", "")),
|
|
priority=int(h.get("priority", 0)),
|
|
allow_jobs=tuple(h.get("allow_jobs", [])),
|
|
deny_jobs=tuple(h.get("deny_jobs", [])),
|
|
)
|
|
|
|
seen_ids: set[str] = set()
|
|
jobs: list[JobSpec] = []
|
|
for j in jobs_raw:
|
|
if "name" not in j:
|
|
raise TrainingManifestError(f"job missing 'name': {j}")
|
|
if "model" not in j:
|
|
raise TrainingManifestError(f"job '{j['name']}' missing 'model'")
|
|
model = str(j["model"])
|
|
if model not in _ALLOWED_MODELS:
|
|
raise TrainingManifestError(
|
|
f"job '{j['name']}': model {model!r} not in "
|
|
f"{sorted(_ALLOWED_MODELS)}"
|
|
)
|
|
mode = str(j.get("mode", "realistic"))
|
|
if mode not in _ALLOWED_MODES:
|
|
raise TrainingManifestError(
|
|
f"job '{j['name']}': mode {mode!r} not in "
|
|
f"{sorted(_ALLOWED_MODES)}"
|
|
)
|
|
recipe = str(j.get("split_recipe", defaults.get("split_recipe", "host")))
|
|
if recipe not in _ALLOWED_RECIPES:
|
|
raise TrainingManifestError(
|
|
f"job '{j['name']}': split_recipe {recipe!r} not in "
|
|
f"{sorted(_ALLOWED_RECIPES)}"
|
|
)
|
|
spec = JobSpec(
|
|
name=str(j["name"]),
|
|
model=model,
|
|
mode=mode,
|
|
priority=int(j.get("priority", 0)),
|
|
require_cuda=bool(j.get("require_cuda", False)),
|
|
prefer_cuda=bool(j.get("prefer_cuda", False)),
|
|
min_vram_gib=float(j.get("min_vram_gib", 0.0)),
|
|
min_ram_gib=float(j.get("min_ram_gib", defaults.get("min_ram_gib", 4.0))),
|
|
min_cores=int(j.get("min_cores", defaults.get("min_cores", 1))),
|
|
allowed_hosts=tuple(j.get("allowed_hosts", [])),
|
|
denied_hosts=tuple(j.get("denied_hosts", [])),
|
|
hyper=dict(j.get("hyper", {})),
|
|
split_recipe=recipe,
|
|
train_hosts=tuple(j.get("train_hosts",
|
|
defaults.get("train_hosts",
|
|
["elliott-thinkpad"]))),
|
|
seed=int(j.get("seed", defaults.get("seed", 0))),
|
|
n_resamples=int(j.get("n_resamples",
|
|
defaults.get("n_resamples", 1000))),
|
|
)
|
|
if spec.job_id in seen_ids:
|
|
# Two manifest entries with identical (model, mode, hyper, …) —
|
|
# they'd hash to the same job_id and collide. Operator error.
|
|
raise TrainingManifestError(
|
|
f"job '{spec.name}' duplicates an earlier job by content "
|
|
f"(same model+mode+hyper+split). Disambiguate via hyper."
|
|
)
|
|
seen_ids.add(spec.job_id)
|
|
jobs.append(spec)
|
|
|
|
return TrainingManifest(
|
|
schema_version=1,
|
|
name=str(raw.get("name", "training-fleet")),
|
|
defaults=dict(defaults),
|
|
hosts=hosts,
|
|
jobs=tuple(jobs),
|
|
)
|
|
|
|
|
|
def load_canonical(repo_root: Path | None = None) -> TrainingManifest:
|
|
"""Load the manifest from the standard locations: /etc/cis490/ first,
|
|
then repo_root/training_manifest.toml. Raises if neither exists."""
|
|
candidates: list[Path] = []
|
|
candidates.append(Path("/etc/cis490/training_manifest.toml"))
|
|
if repo_root is not None:
|
|
candidates.append(repo_root / "training_manifest.toml")
|
|
candidates.append(Path("training_manifest.toml"))
|
|
for p in candidates:
|
|
if p.exists():
|
|
return load(p)
|
|
raise TrainingManifestError(
|
|
f"no training_manifest.toml found at any of: "
|
|
f"{[str(p) for p in candidates]}"
|
|
)
|