CIS490/tools/cis490_jobs.py
Max 8643192a71 training/fleet: distributed multi-host trainer with capability gating
Symmetric companion to the collection fleet (orchestrator/fleet.py)
but for *training*. Collection is embarrassingly parallel; training
is not (a model is trained at most once across the fleet), so the
receiver coordinates which worker gets which job.

Operator-control surface is etc/training_manifest.toml.example —
single canonical file declaring (a) per-host capability + per-model
allow/deny policy, (b) one [[jobs]] entry per (model, mode, hyper)
with capability constraints (require_cuda, prefer_cuda, min_vram_gib,
min_ram_gib, allowed_hosts).

Components:

  capability.py — self-detection: hostname, cores, RAM, CUDA presence,
    VRAM, torch version, git commit. Used by workers to filter
    eligible jobs before claiming.

  manifest.py — TOML loader + JobSpec/HostSpec. Job IDs are stable
    sha256 of (model, mode, hyper, split_recipe, train_hosts, seed)
    so manifest reload is idempotent: existing rows keep their status,
    new jobs become claimable, removed jobs stay until cancelled.

  queue.py — SQLite job queue (training_jobs.db) with statuses
    pending|claimed|running|completed|failed|cancelled. Atomic
    claim_next via single UPDATE WHERE status='pending'. Heartbeat,
    complete, fail. Stale-claim sweep (stale_after_s=600s) with
    max_attempts cutoff to failed.

  store.py — model artifact store mirroring receiver/store.py.
    Artifact ID is the sha256 of the uploaded tarball; bit-identical
    re-runs deduplicate.

  receiver.py — Starlette app exposing 11 endpoints:
    POST /v1/job/claim          (worker)
    POST /v1/job/{id}/heartbeat (worker)
    POST /v1/job/{id}/complete  (worker)
    POST /v1/job/{id}/fail      (worker)
    PUT  /v1/model/{id}         (worker — uploads tarball)
    GET  /v1/jobs               (anyone)
    GET  /v1/workers            (anyone)
    POST /v1/job/{id}/cancel    (operator: X-Operator-Token)
    POST /v1/job/{id}/requeue   (operator)
    POST /v1/manifest/reload    (operator)
    GET  /v1/health             (anyone)
    Runs as cis490-trainer-receiver.service on the Pi alongside the
    existing receiver, on a separate port.

  client.py — stdlib HTTP client (urllib only, no new deps).

  worker.py — long-running daemon. Loop: detect capability → claim →
    spawn training/trainer/run.py subprocess → heartbeat every 30s →
    tar artifact, sha256, PUT /v1/model → complete. SIGTERM-safe.

Operator CLI (tools/cis490_jobs.py): status / list / show / cancel /
requeue / reload / workers. Cancel and requeue require
$CIS490_OPERATOR_TOKEN matching the receiver's configured value.

Bootstrap: scripts/install-training-worker.sh (Linux systemd) and
scripts/install-training-worker-windows.ps1 (Windows Scheduled Task)
let the operator enroll a new host with one command after cloning
the repo and setting up the venv. Worker self-tests capability
before registering.

End-to-end smoke verified on the Pi: receiver up, manifest synced,
14 jobs queued, worker registered, claimed 4 CPU-eligible jobs
(allow_jobs=["gbt","mlp"]), completed 3 (gbt-realistic, gbt-oracle,
mlp-oracle), 1 failed with the actual error visible via
cis490-jobs status, 3 artifacts uploaded to
/var/lib/cis490/models/<model>_<mode>/<sha256>/bundle.tar.zst with
proper index.jsonl row.

21 unit tests (manifest validation: 8; queue lifecycle + eligibility:
13). All pass alongside the prior 17 training tests = 38 green.

Open limitations surfaced inline:
  - Hyper-key drift between manifest and run.py fails at training
    time, not at manifest reload (worth tightening to argparse
    introspection later).
  - mTLS not yet wired through Caddy for the trainer-receiver port —
    listens loopback-only until that lands.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:20:20 -05:00

198 lines
6.9 KiB
Python

"""cis490-jobs — operator control CLI for the training fleet.
Talks to the trainer-receiver over HTTP. Subcommands:
cis490-jobs status pretty-print queue + worker status
cis490-jobs list [--status pending]
cis490-jobs show <job_id>
cis490-jobs cancel <job_id>
cis490-jobs requeue <job_id> force-requeue from any state
cis490-jobs reload re-read manifest, sync queue
cis490-jobs workers last-seen capability per worker
Auth: control endpoints require X-Operator-Token. Set it via
$CIS490_OPERATOR_TOKEN. Status endpoints (status, list, show, workers)
work without a token.
Usage from outside the Pi: set --receiver-url to the Pi's WG address
(e.g., http://10.100.0.1:8445).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from training.fleet.client import FleetClient
def _client_from_args(args) -> FleetClient:
token = (args.token if args.token
else os.environ.get("CIS490_OPERATOR_TOKEN"))
return FleetClient(args.receiver_url,
host_id=args.as_host or os.uname().nodename,
operator_token=token)
def cmd_status(args) -> int:
c = _client_from_args(args)
jobs = c.list_jobs()
workers = c.workers()
from collections import Counter
counts = Counter(j["status"] for j in jobs)
print("=== queue ===")
for s in ("pending", "claimed", "running", "completed", "failed", "cancelled"):
n = counts.get(s, 0)
print(f" {s:>10} {n}")
print()
print(f"=== workers ({len(workers)}) ===")
now = time.time()
for w in workers:
cap = w.get("capability", {})
seen = (now - float(w.get("last_seen", 0)))
cuda = "CUDA" if cap.get("cuda_available") else "CPU"
vram = cap.get("cuda_devices", [{}])[0].get("vram_total_gib", 0.0) \
if cap.get("cuda_devices") else 0.0
print(f" {w['hostname']:>20} {cuda} cores={cap.get('cpu_cores')}"
f" ram={cap.get('ram_available_gib', 0):.1f}/"
f"{cap.get('ram_total_gib', 0):.1f}GiB"
f" vram={vram:.1f}GiB last_seen={seen:.0f}s ago")
print()
print("=== running ===")
for j in jobs:
if j["status"] in ("claimed", "running"):
print(f" {j['name']:>26} by={j['claimed_by']} status={j['status']}")
print()
print("=== failed ===")
for j in jobs:
if j["status"] == "failed":
err = (j.get("last_error") or "")[:100]
print(f" {j['name']:>26} attempts={j['attempts']} err={err}")
return 0
def cmd_list(args) -> int:
c = _client_from_args(args)
jobs = c.list_jobs(status=args.status)
if args.json:
print(json.dumps(jobs, indent=2))
return 0
print(f" {'name':<26} {'model':<18} {'mode':<10} {'prio':>5} "
f"{'status':<10} {'host':<16}")
for j in jobs:
print(f" {j['name']:<26} {j.get('model','?'):<18} "
f"{j.get('mode','?'):<10} {j.get('priority','?'):>5} "
f"{j['status']:<10} {(j.get('claimed_by') or '-'):<16}")
return 0
def cmd_show(args) -> int:
c = _client_from_args(args)
jobs = c.list_jobs()
job = next((j for j in jobs if j["job_id"] == args.job_id
or j["name"] == args.job_id), None)
if job is None:
print(f"no job matching {args.job_id!r}", file=sys.stderr)
return 1
print(json.dumps(job, indent=2))
return 0
def cmd_cancel(args) -> int:
c = _client_from_args(args)
ok = c.cancel(args.job_id)
print("cancelled" if ok else "cancel failed (wrong state? unknown id?)",
file=sys.stderr)
return 0 if ok else 1
def cmd_requeue(args) -> int:
c = _client_from_args(args)
ok = c.requeue(args.job_id)
print("requeued" if ok else "requeue failed",
file=sys.stderr)
return 0 if ok else 1
def cmd_reload(args) -> int:
c = _client_from_args(args)
res = c.reload_manifest()
print(json.dumps(res, indent=2))
return 0
def cmd_workers(args) -> int:
c = _client_from_args(args)
workers = c.workers()
if args.json:
print(json.dumps(workers, indent=2))
else:
for w in workers:
print(f"\n=== {w['hostname']} ===")
cap = w.get("capability", {})
print(f" os/arch: {cap.get('os')}/{cap.get('arch')}")
print(f" python: {cap.get('python_version')} torch={cap.get('torch_version')}")
print(f" cores: {cap.get('cpu_cores')}")
print(f" ram: {cap.get('ram_available_gib', 0):.1f} / "
f"{cap.get('ram_total_gib', 0):.1f} GiB")
print(f" cuda: {cap.get('cuda_available')}")
for d in cap.get("cuda_devices") or []:
print(f" {d.get('name')} "
f"vram={d.get('vram_free_gib',0):.1f}/{d.get('vram_total_gib',0):.1f} GiB")
print(f" commit: {(cap.get('training_commit') or '-')[:12]}")
return 0
def main() -> int:
p = argparse.ArgumentParser(prog="cis490-jobs")
p.add_argument("--receiver-url", default=os.environ.get(
"CIS490_TRAINER_RECEIVER_URL", "http://10.100.0.1:8445"
))
p.add_argument("--token",
help="operator token (or $CIS490_OPERATOR_TOKEN)")
p.add_argument("--as-host", default=None,
help="X-Lab-Host header (default: this machine)")
sub = p.add_subparsers(dest="cmd", required=True)
s_status = sub.add_parser("status",
help="pretty-print queue + worker status")
s_status.set_defaults(func=cmd_status)
s_list = sub.add_parser("list", help="list jobs")
s_list.add_argument("--status",
choices=["pending","claimed","running","completed",
"failed","cancelled"])
s_list.add_argument("--json", action="store_true")
s_list.set_defaults(func=cmd_list)
s_show = sub.add_parser("show", help="full detail for one job (id or name)")
s_show.add_argument("job_id")
s_show.set_defaults(func=cmd_show)
s_cancel = sub.add_parser("cancel", help="mark pending/failed → cancelled")
s_cancel.add_argument("job_id")
s_cancel.set_defaults(func=cmd_cancel)
s_requeue = sub.add_parser("requeue",
help="force any non-pending job back to pending")
s_requeue.add_argument("job_id")
s_requeue.set_defaults(func=cmd_requeue)
s_reload = sub.add_parser("reload",
help="re-read manifest, sync queue")
s_reload.set_defaults(func=cmd_reload)
s_workers = sub.add_parser("workers", help="list workers + capabilities")
s_workers.add_argument("--json", action="store_true")
s_workers.set_defaults(func=cmd_workers)
args = p.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())