Symmetric companion to the collection fleet (orchestrator/fleet.py)
but for *training*. Collection is embarrassingly parallel; training
is not (a model is trained at most once across the fleet), so the
receiver coordinates which worker gets which job.
Operator-control surface is etc/training_manifest.toml.example —
single canonical file declaring (a) per-host capability + per-model
allow/deny policy, (b) one [[jobs]] entry per (model, mode, hyper)
with capability constraints (require_cuda, prefer_cuda, min_vram_gib,
min_ram_gib, allowed_hosts).
Components:
capability.py — self-detection: hostname, cores, RAM, CUDA presence,
VRAM, torch version, git commit. Used by workers to filter
eligible jobs before claiming.
manifest.py — TOML loader + JobSpec/HostSpec. Job IDs are stable
sha256 of (model, mode, hyper, split_recipe, train_hosts, seed)
so manifest reload is idempotent: existing rows keep their status,
new jobs become claimable, removed jobs stay until cancelled.
queue.py — SQLite job queue (training_jobs.db) with statuses
pending|claimed|running|completed|failed|cancelled. Atomic
claim_next via single UPDATE WHERE status='pending'. Heartbeat,
complete, fail. Stale-claim sweep (stale_after_s=600s) with
max_attempts cutoff to failed.
store.py — model artifact store mirroring receiver/store.py.
Artifact ID is the sha256 of the uploaded tarball; bit-identical
re-runs deduplicate.
receiver.py — Starlette app exposing 11 endpoints:
POST /v1/job/claim (worker)
POST /v1/job/{id}/heartbeat (worker)
POST /v1/job/{id}/complete (worker)
POST /v1/job/{id}/fail (worker)
PUT /v1/model/{id} (worker — uploads tarball)
GET /v1/jobs (anyone)
GET /v1/workers (anyone)
POST /v1/job/{id}/cancel (operator: X-Operator-Token)
POST /v1/job/{id}/requeue (operator)
POST /v1/manifest/reload (operator)
GET /v1/health (anyone)
Runs as cis490-trainer-receiver.service on the Pi alongside the
existing receiver, on a separate port.
client.py — stdlib HTTP client (urllib only, no new deps).
worker.py — long-running daemon. Loop: detect capability → claim →
spawn training/trainer/run.py subprocess → heartbeat every 30s →
tar artifact, sha256, PUT /v1/model → complete. SIGTERM-safe.
Operator CLI (tools/cis490_jobs.py): status / list / show / cancel /
requeue / reload / workers. Cancel and requeue require
$CIS490_OPERATOR_TOKEN matching the receiver's configured value.
Bootstrap: scripts/install-training-worker.sh (Linux systemd) and
scripts/install-training-worker-windows.ps1 (Windows Scheduled Task)
let the operator enroll a new host with one command after cloning
the repo and setting up the venv. Worker self-tests capability
before registering.
End-to-end smoke verified on the Pi: receiver up, manifest synced,
14 jobs queued, worker registered, claimed 4 CPU-eligible jobs
(allow_jobs=["gbt","mlp"]), completed 3 (gbt-realistic, gbt-oracle,
mlp-oracle), 1 failed with the actual error visible via
cis490-jobs status, 3 artifacts uploaded to
/var/lib/cis490/models/<model>_<mode>/<sha256>/bundle.tar.zst with
proper index.jsonl row.
21 unit tests (manifest validation: 8; queue lifecycle + eligibility:
13). All pass alongside the prior 17 training tests = 38 green.
Open limitations surfaced inline:
- Hyper-key drift between manifest and run.py fails at training
time, not at manifest reload (worth tightening to argparse
introspection later).
- mTLS not yet wired through Caddy for the trainer-receiver port —
listens loopback-only until that lands.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
146 lines
3 KiB
Python
146 lines
3 KiB
Python
"""Tests for training/fleet/manifest.py — TOML loader + schema."""
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from training.fleet.manifest import (
|
|
JobSpec, TrainingManifestError, load,
|
|
)
|
|
|
|
|
|
def _write(tmp_path: Path, body: str) -> Path:
|
|
p = tmp_path / "training_manifest.toml"
|
|
p.write_text(body)
|
|
return p
|
|
|
|
|
|
def test_load_minimal(tmp_path):
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
name = "test"
|
|
|
|
[[jobs]]
|
|
name = "gbt-r"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
""")
|
|
m = load(p)
|
|
assert m.name == "test"
|
|
assert len(m.jobs) == 1
|
|
assert m.jobs[0].model == "gbt"
|
|
assert m.jobs[0].mode == "realistic"
|
|
|
|
|
|
def test_unknown_model_rejected(tmp_path):
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
name = "test"
|
|
[[jobs]]
|
|
name = "bogus"
|
|
model = "transformer_xl"
|
|
mode = "realistic"
|
|
""")
|
|
with pytest.raises(TrainingManifestError, match="not in"):
|
|
load(p)
|
|
|
|
|
|
def test_unknown_mode_rejected(tmp_path):
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
[[jobs]]
|
|
name = "x"
|
|
model = "gbt"
|
|
mode = "weirdo"
|
|
""")
|
|
with pytest.raises(TrainingManifestError, match="mode"):
|
|
load(p)
|
|
|
|
|
|
def test_duplicate_job_id_rejected(tmp_path):
|
|
"""Same model+mode+hyper → same job_id → operator must disambiguate."""
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
[[jobs]]
|
|
name = "first"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
|
|
[[jobs]]
|
|
name = "duplicate-by-content"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
""")
|
|
with pytest.raises(TrainingManifestError, match="duplicates"):
|
|
load(p)
|
|
|
|
|
|
def test_disambiguation_via_hyper(tmp_path):
|
|
"""Same model+mode but different hyper → different job_ids → OK."""
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
[[jobs]]
|
|
name = "lr1"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
hyper.lr = 0.1
|
|
|
|
[[jobs]]
|
|
name = "lr2"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
hyper.lr = 0.05
|
|
""")
|
|
m = load(p)
|
|
assert m.jobs[0].job_id != m.jobs[1].job_id
|
|
|
|
|
|
def test_host_allow_deny(tmp_path):
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
[hosts.tiny]
|
|
allow_jobs = ["gbt"]
|
|
[hosts.huge]
|
|
deny_jobs = ["transformer"]
|
|
|
|
[[jobs]]
|
|
name = "x"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
""")
|
|
m = load(p)
|
|
assert m.hosts["tiny"].is_model_allowed("gbt")
|
|
assert not m.hosts["tiny"].is_model_allowed("transformer")
|
|
assert m.hosts["huge"].is_model_allowed("gbt")
|
|
assert not m.hosts["huge"].is_model_allowed("transformer")
|
|
|
|
|
|
def test_job_id_stable_across_loads(tmp_path):
|
|
src = """
|
|
schema_version = 1
|
|
[[jobs]]
|
|
name = "stable"
|
|
model = "transformer"
|
|
mode = "oracle"
|
|
hyper.epochs = 80
|
|
hyper.batch_size = 256
|
|
"""
|
|
a = load(_write(tmp_path / "a", src) if False else _write(tmp_path, src))
|
|
p2 = tmp_path / "b.toml"
|
|
p2.write_text(src)
|
|
b = load(p2)
|
|
# Same content → same job_id (it's the load-portable identity)
|
|
assert a.jobs[0].job_id == b.jobs[0].job_id
|
|
|
|
|
|
def test_priority_default_zero(tmp_path):
|
|
p = _write(tmp_path, """
|
|
schema_version = 1
|
|
[[jobs]]
|
|
name = "x"
|
|
model = "gbt"
|
|
mode = "realistic"
|
|
""")
|
|
m = load(p)
|
|
assert m.jobs[0].priority == 0
|