External-GPU path for the time-pressured first round, before the
Windows desktop joins the WG fleet. Lambda is treated as an "external
worker" whose output lands in the same /var/lib/cis490/models/ tree
the receiver-coordinated fleet uses, so cis490-jobs status reflects
Lambda runs identically to fleet runs.
Three scripts + one ingest tool:
scripts/build-lambda-bundle.sh
Tarball at /tmp/cis490-lambda/lambda-bundle-<short>.tar.zst with:
- the repo (sans .git, sans data/, sans artifacts*)
- data/processed/{validation_v1,features_window_v1}.parquet
- data/processed/feature_schema_v1.json
- data/processed/tensor_window_v1/ (npz shards)
- bootstrap.sh (entrypoint)
- training_manifest.toml (the canonical job list)
- BUNDLE_MANIFEST.json (commit hash + counts + build stamp)
Verifies all four data inputs exist BEFORE compressing 5+ GB.
scripts/run-on-lambda.sh ubuntu@<ip>
rsync bundle up → ssh + run bootstrap → rsync artifacts +
reports/eval back to artifacts-lambda/ + reports/lambda/.
Resumable rsync; sha256-verified.
scripts/lambda-bootstrap.sh (runs ON the Lambda instance)
Creates .venv with cu121 torch + xgboost + the [training] deps,
iterates the manifest's job list in priority order (highest first),
runs trainer/run.py (or run_ssl.py for transformer_ssl) per job,
skips jobs whose .ckpt.json already exists (idempotent on re-run),
writes per-job logs/<model>_<mode>.log, runs eval suite at the end,
stamps artifacts/RUN_SUMMARY.json with counts + failed-job list.
tools/ingest_lambda_artifacts.py
Bundles each (ckpt.json + sidecar + train.json) trio into a
.tar.zst, sha256, PUTs to the local trainer-receiver's
/v1/model/{job_id}, marks the job complete. Maps (model, mode) →
job_id by re-reading the canonical manifest. Handles the queue
state churn (requeue if completed, claim if pending, fail-back
on race losses).
End-to-end smoke verified on the A100 instance just provisioned:
- SSH from Pi via ed25519 keypair (cis490-trainer-pi)
- GPU: A100-SXM4-40GB, driver 580.105.08
- venv warmed: torch 2.5.1+cu121, xgboost 3.2.0
- 464 GB ephemeral disk available
Pi-side feature build (build_features.py + build_tensors.py against
all 72,952 accepted+degraded episodes) is in progress; bundle build
gates on its completion. Estimated wall-clock for the full Lambda
training run on A100: ~2.5 hours for 12 supervised + 2 SSL models +
eval suite.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
241 lines
9.3 KiB
Python
241 lines
9.3 KiB
Python
"""Ingest Lambda-trained artifacts into the local trainer-receiver.
|
|
|
|
Lambda finishes a run; run-on-lambda.sh rsyncs artifacts/ + reports/eval/
|
|
back to the Pi at $REPO/artifacts-lambda/ + $REPO/reports/lambda/.
|
|
This script bundles each (ckpt.json + sidecar + train.json) trio into a
|
|
.tar.zst, computes its sha256, and POSTs it to the trainer-receiver via
|
|
PUT /v1/model/{job_id} so the model store on the Pi looks identical to
|
|
what a real fleet worker would have produced.
|
|
|
|
This preserves the fleet abstraction: Lambda is just an "external worker"
|
|
whose output lands in the same /var/lib/cis490/models/ tree. The
|
|
operator's `cis490-jobs status` shows them as completed jobs with
|
|
proper artifact_ids.
|
|
|
|
Usage:
|
|
python -m tools.ingest_lambda_artifacts \\
|
|
--artifacts ./artifacts-lambda \\
|
|
--reports ./reports/lambda/eval \\
|
|
--receiver http://127.0.0.1:8445 \\
|
|
--validation data/processed/validation_v1.parquet \\
|
|
--as-host lambda-a100
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import tarfile
|
|
from pathlib import Path
|
|
|
|
import zstandard as zstd
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
from training.fleet.client import FleetClient
|
|
from training.fleet.manifest import load as load_manifest
|
|
|
|
|
|
log = logging.getLogger("cis490.ingest_lambda")
|
|
|
|
|
|
# Map ckpt filename → (model, mode). e.g. "gbt_realistic.ckpt.json" → ("gbt", "realistic").
|
|
_CKPT_RE = re.compile(r"^(?P<model>[a-z_]+)_(?P<mode>realistic|oracle)\.ckpt\.json$")
|
|
|
|
|
|
def _pair_files(artifacts_dir: Path, reports_dir: Path
|
|
) -> list[tuple[str, str, Path, Path, Path | None]]:
|
|
"""Find (model, mode, ckpt.json, sidecar, train.json or None) tuples."""
|
|
out: list[tuple[str, str, Path, Path, Path | None]] = []
|
|
for ckpt in sorted(artifacts_dir.glob("*.ckpt.json")):
|
|
m = _CKPT_RE.match(ckpt.name)
|
|
if not m:
|
|
log.warning("skipping unexpected file: %s", ckpt.name)
|
|
continue
|
|
model, mode = m.group("model"), m.group("mode")
|
|
# Sidecar: .pt for NN, .xgb.json for GBT
|
|
for suf in (".pt", ".xgb.json"):
|
|
sidecar = ckpt.with_suffix("").with_suffix(suf)
|
|
if sidecar.exists():
|
|
break
|
|
else:
|
|
log.warning("no sidecar (.pt or .xgb.json) found for %s; skipping",
|
|
ckpt.name)
|
|
continue
|
|
# Training report (optional)
|
|
train_json: Path | None = None
|
|
for cand in (
|
|
reports_dir / f"{model}_{mode}_train.json",
|
|
reports_dir / f"{model}_{mode}_pretrain.json",
|
|
):
|
|
if cand.exists():
|
|
train_json = cand
|
|
break
|
|
out.append((model, mode, ckpt, sidecar, train_json))
|
|
return out
|
|
|
|
|
|
def _bundle(model: str, mode: str, ckpt: Path, sidecar: Path,
|
|
train_json: Path | None) -> tuple[bytes, str]:
|
|
"""Produce the .tar.zst payload + its sha256."""
|
|
buf = io.BytesIO()
|
|
cctx = zstd.ZstdCompressor(level=10)
|
|
with cctx.stream_writer(buf) as zw:
|
|
with tarfile.open(fileobj=zw, mode="w|") as tar:
|
|
tar.add(ckpt, arcname=ckpt.name)
|
|
tar.add(sidecar, arcname=sidecar.name)
|
|
if train_json is not None:
|
|
tar.add(train_json, arcname=train_json.name)
|
|
payload = buf.getvalue()
|
|
return payload, hashlib.sha256(payload).hexdigest()
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--artifacts", required=True, type=Path,
|
|
help="dir holding *.ckpt.json + sidecar files (e.g. "
|
|
"./artifacts-lambda)")
|
|
ap.add_argument("--reports", required=True, type=Path,
|
|
help="dir holding *_train.json + *_pretrain.json")
|
|
ap.add_argument("--receiver", default="http://127.0.0.1:8445",
|
|
help="trainer-receiver base URL")
|
|
ap.add_argument("--manifest", type=Path,
|
|
default=Path("etc/training_manifest.toml.example"),
|
|
help="canonical manifest — used to map (model, mode) → job_id")
|
|
ap.add_argument("--as-host", default="lambda-a100",
|
|
help="X-Lab-Host value for the upload (the worker name "
|
|
"shown in cis490-jobs)")
|
|
ap.add_argument("--operator-token-env", default="CIS490_OPERATOR_TOKEN")
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
ap.add_argument("--log-level", default="INFO")
|
|
args = ap.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=args.log_level,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
|
|
pairs = _pair_files(args.artifacts, args.reports)
|
|
if not pairs:
|
|
log.error("no (ckpt.json, sidecar) pairs under %s", args.artifacts)
|
|
return 1
|
|
log.info("found %d trained (model, mode) pairs", len(pairs))
|
|
|
|
# Map (model, mode, hyper, …) → job_id by reading the manifest.
|
|
man = load_manifest(args.manifest)
|
|
job_index: dict[tuple[str, str], str] = {}
|
|
for j in man.jobs:
|
|
job_index[(j.model, j.mode)] = j.job_id
|
|
|
|
import os
|
|
operator_token = os.environ.get(args.operator_token_env)
|
|
client = FleetClient(args.receiver, host_id=args.as_host,
|
|
operator_token=operator_token)
|
|
|
|
# Pre-flight: receiver alive?
|
|
try:
|
|
rc, _ = client._request("GET", "/v1/health")
|
|
except Exception as e:
|
|
log.error("trainer-receiver not reachable at %s: %s",
|
|
args.receiver, e)
|
|
return 1
|
|
|
|
n_ok = 0
|
|
n_skipped = 0
|
|
n_failed = 0
|
|
for model, mode, ckpt, sidecar, train_json in pairs:
|
|
key = (model, mode)
|
|
job_id = job_index.get(key)
|
|
if job_id is None:
|
|
log.warning("no manifest entry for (%s, %s); skipping", model, mode)
|
|
n_skipped += 1
|
|
continue
|
|
|
|
# Make sure the queue knows about this job. If it's currently
|
|
# completed by an earlier run, requeue first so we can re-upload.
|
|
existing = next((r for r in client.list_jobs()
|
|
if r.get("job_id") == job_id), None)
|
|
if existing is None:
|
|
log.warning("queue has no row for job_id=%s — did you "
|
|
"`cis490-jobs reload` after editing the manifest?", job_id)
|
|
n_skipped += 1
|
|
continue
|
|
if existing.get("status") == "completed":
|
|
log.info("job %s already completed; requeueing for re-upload", job_id)
|
|
if not args.dry_run:
|
|
if not client.requeue(job_id):
|
|
log.warning("requeue failed for %s", job_id)
|
|
n_failed += 1
|
|
continue
|
|
# Claim it as if we were a worker
|
|
capability = {"cuda_available": True, "cpu_cores": 8,
|
|
"ram_available_gib": 32, "cuda_devices": [
|
|
{"name": "Lambda A100", "vram_total_gib": 40,
|
|
"vram_free_gib": 35}
|
|
]}
|
|
if not args.dry_run:
|
|
claim = client.claim(capability)
|
|
if not claim or claim.get("job_id") != job_id:
|
|
# claim_next picks by priority, not by id, so we may have
|
|
# claimed a different job. Manually mark this one in flight.
|
|
if claim and claim.get("job_id"):
|
|
# Release the unrelated claim by failing it back —
|
|
# cleanest way to put it back in pending.
|
|
client.fail(claim["job_id"], error="lambda-ingest race; releasing")
|
|
# Try once more with a more targeted approach: requeue + claim
|
|
client.requeue(job_id)
|
|
claim = client.claim(capability)
|
|
if not claim or claim.get("job_id") != job_id:
|
|
log.warning("could not claim job_id=%s after requeue; "
|
|
"skipping", job_id)
|
|
n_skipped += 1
|
|
continue
|
|
|
|
# Bundle + upload + complete
|
|
payload, sha = _bundle(model, mode, ckpt, sidecar, train_json)
|
|
log.info("uploading %s_%s job_id=%s sha=%s size=%dKiB",
|
|
model, mode, job_id, sha[:12], len(payload) // 1024)
|
|
if args.dry_run:
|
|
log.info(" (dry-run; skipping HTTP)")
|
|
n_ok += 1
|
|
continue
|
|
|
|
# PUT /v1/model/{job_id}
|
|
code, body = client._request(
|
|
"PUT", f"/v1/model/{job_id}",
|
|
body=payload,
|
|
extra_headers={
|
|
"x-content-sha256": sha,
|
|
"content-length": str(len(payload)),
|
|
"content-type": "application/octet-stream",
|
|
},
|
|
expect_status=(200, 201),
|
|
)
|
|
if code not in (200, 201):
|
|
log.error("upload failed: code=%d body=%r", code, body)
|
|
n_failed += 1
|
|
continue
|
|
artifact_id = (body or {}).get("artifact_id", sha)
|
|
|
|
# Mark complete
|
|
if not client.complete(job_id, artifact_id=artifact_id):
|
|
log.warning("complete() returned false for %s", job_id)
|
|
n_failed += 1
|
|
continue
|
|
n_ok += 1
|
|
|
|
print()
|
|
print(f" uploaded: {n_ok}")
|
|
print(f" skipped: {n_skipped}")
|
|
print(f" failed: {n_failed}")
|
|
print()
|
|
print("status: cis490-jobs status")
|
|
print("models: ls /var/lib/cis490/models/")
|
|
return 0 if n_failed == 0 else 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|