"""Ingest Lambda-trained artifacts into the local trainer-receiver. Lambda finishes a run; run-on-lambda.sh rsyncs artifacts/ + reports/eval/ back to the Pi at $REPO/artifacts-lambda/ + $REPO/reports/lambda/. This script bundles each (ckpt.json + sidecar + train.json) trio into a .tar.zst, computes its sha256, and POSTs it to the trainer-receiver via PUT /v1/model/{job_id} so the model store on the Pi looks identical to what a real fleet worker would have produced. This preserves the fleet abstraction: Lambda is just an "external worker" whose output lands in the same /var/lib/cis490/models/ tree. The operator's `cis490-jobs status` shows them as completed jobs with proper artifact_ids. Usage: python -m tools.ingest_lambda_artifacts \\ --artifacts ./artifacts-lambda \\ --reports ./reports/lambda/eval \\ --receiver http://127.0.0.1:8445 \\ --validation data/processed/validation_v1.parquet \\ --as-host lambda-a100 """ from __future__ import annotations import argparse import hashlib import io import json import logging import re import sys import tarfile from pathlib import Path import zstandard as zstd sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from training.fleet.client import FleetClient from training.fleet.manifest import load as load_manifest log = logging.getLogger("cis490.ingest_lambda") # Map ckpt filename → (model, mode). e.g. "gbt_realistic.ckpt.json" → ("gbt", "realistic"). _CKPT_RE = re.compile(r"^(?P[a-z_]+)_(?Prealistic|oracle)\.ckpt\.json$") def _pair_files(artifacts_dir: Path, reports_dir: Path ) -> list[tuple[str, str, Path, Path, Path | None]]: """Find (model, mode, ckpt.json, sidecar, train.json or None) tuples.""" out: list[tuple[str, str, Path, Path, Path | None]] = [] for ckpt in sorted(artifacts_dir.glob("*.ckpt.json")): m = _CKPT_RE.match(ckpt.name) if not m: log.warning("skipping unexpected file: %s", ckpt.name) continue model, mode = m.group("model"), m.group("mode") # Sidecar: .pt for NN, .xgb.json for GBT for suf in (".pt", ".xgb.json"): sidecar = ckpt.with_suffix("").with_suffix(suf) if sidecar.exists(): break else: log.warning("no sidecar (.pt or .xgb.json) found for %s; skipping", ckpt.name) continue # Training report (optional) train_json: Path | None = None for cand in ( reports_dir / f"{model}_{mode}_train.json", reports_dir / f"{model}_{mode}_pretrain.json", ): if cand.exists(): train_json = cand break out.append((model, mode, ckpt, sidecar, train_json)) return out def _bundle(model: str, mode: str, ckpt: Path, sidecar: Path, train_json: Path | None) -> tuple[bytes, str]: """Produce the .tar.zst payload + its sha256.""" buf = io.BytesIO() cctx = zstd.ZstdCompressor(level=10) with cctx.stream_writer(buf) as zw: with tarfile.open(fileobj=zw, mode="w|") as tar: tar.add(ckpt, arcname=ckpt.name) tar.add(sidecar, arcname=sidecar.name) if train_json is not None: tar.add(train_json, arcname=train_json.name) payload = buf.getvalue() return payload, hashlib.sha256(payload).hexdigest() def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--artifacts", required=True, type=Path, help="dir holding *.ckpt.json + sidecar files (e.g. " "./artifacts-lambda)") ap.add_argument("--reports", required=True, type=Path, help="dir holding *_train.json + *_pretrain.json") ap.add_argument("--receiver", default="http://127.0.0.1:8445", help="trainer-receiver base URL") ap.add_argument("--manifest", type=Path, default=Path("etc/training_manifest.toml.example"), help="canonical manifest — used to map (model, mode) → job_id") ap.add_argument("--as-host", default="lambda-a100", help="X-Lab-Host value for the upload (the worker name " "shown in cis490-jobs)") ap.add_argument("--operator-token-env", default="CIS490_OPERATOR_TOKEN") ap.add_argument("--dry-run", action="store_true") ap.add_argument("--log-level", default="INFO") args = ap.parse_args() logging.basicConfig( level=args.log_level, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) pairs = _pair_files(args.artifacts, args.reports) if not pairs: log.error("no (ckpt.json, sidecar) pairs under %s", args.artifacts) return 1 log.info("found %d trained (model, mode) pairs", len(pairs)) # Map (model, mode, hyper, …) → job_id by reading the manifest. man = load_manifest(args.manifest) job_index: dict[tuple[str, str], str] = {} for j in man.jobs: job_index[(j.model, j.mode)] = j.job_id import os operator_token = os.environ.get(args.operator_token_env) client = FleetClient(args.receiver, host_id=args.as_host, operator_token=operator_token) # Pre-flight: receiver alive? try: rc, _ = client._request("GET", "/v1/health") except Exception as e: log.error("trainer-receiver not reachable at %s: %s", args.receiver, e) return 1 n_ok = 0 n_skipped = 0 n_failed = 0 for model, mode, ckpt, sidecar, train_json in pairs: key = (model, mode) job_id = job_index.get(key) if job_id is None: log.warning("no manifest entry for (%s, %s); skipping", model, mode) n_skipped += 1 continue # Make sure the queue knows about this job. If it's currently # completed by an earlier run, requeue first so we can re-upload. existing = next((r for r in client.list_jobs() if r.get("job_id") == job_id), None) if existing is None: log.warning("queue has no row for job_id=%s — did you " "`cis490-jobs reload` after editing the manifest?", job_id) n_skipped += 1 continue if existing.get("status") == "completed": log.info("job %s already completed; requeueing for re-upload", job_id) if not args.dry_run: if not client.requeue(job_id): log.warning("requeue failed for %s", job_id) n_failed += 1 continue # Claim it as if we were a worker capability = {"cuda_available": True, "cpu_cores": 8, "ram_available_gib": 32, "cuda_devices": [ {"name": "Lambda A100", "vram_total_gib": 40, "vram_free_gib": 35} ]} if not args.dry_run: claim = client.claim(capability) if not claim or claim.get("job_id") != job_id: # claim_next picks by priority, not by id, so we may have # claimed a different job. Manually mark this one in flight. if claim and claim.get("job_id"): # Release the unrelated claim by failing it back — # cleanest way to put it back in pending. client.fail(claim["job_id"], error="lambda-ingest race; releasing") # Try once more with a more targeted approach: requeue + claim client.requeue(job_id) claim = client.claim(capability) if not claim or claim.get("job_id") != job_id: log.warning("could not claim job_id=%s after requeue; " "skipping", job_id) n_skipped += 1 continue # Bundle + upload + complete payload, sha = _bundle(model, mode, ckpt, sidecar, train_json) log.info("uploading %s_%s job_id=%s sha=%s size=%dKiB", model, mode, job_id, sha[:12], len(payload) // 1024) if args.dry_run: log.info(" (dry-run; skipping HTTP)") n_ok += 1 continue # PUT /v1/model/{job_id} code, body = client._request( "PUT", f"/v1/model/{job_id}", body=payload, extra_headers={ "x-content-sha256": sha, "content-length": str(len(payload)), "content-type": "application/octet-stream", }, expect_status=(200, 201), ) if code not in (200, 201): log.error("upload failed: code=%d body=%r", code, body) n_failed += 1 continue artifact_id = (body or {}).get("artifact_id", sha) # Mark complete if not client.complete(job_id, artifact_id=artifact_id): log.warning("complete() returned false for %s", job_id) n_failed += 1 continue n_ok += 1 print() print(f" uploaded: {n_ok}") print(f" skipped: {n_skipped}") print(f" failed: {n_failed}") print() print("status: cis490-jobs status") print("models: ls /var/lib/cis490/models/") return 0 if n_failed == 0 else 2 if __name__ == "__main__": raise SystemExit(main())