diff --git a/scripts/build-lambda-bundle.sh b/scripts/build-lambda-bundle.sh new file mode 100755 index 0000000..e99502e --- /dev/null +++ b/scripts/build-lambda-bundle.sh @@ -0,0 +1,120 @@ +#!/usr/bin/env bash +# Build a self-contained tarball ready for rsync to a Lambda GPU instance. +# +# Inputs: +# - The repo at /home/max/.env/CIS490 (or $REPO_ROOT) +# - data/processed/validation_v1.parquet +# - data/processed/features_window_v1.parquet +# - data/processed/feature_schema_v1.json +# - data/processed/tensor_window_v1/ (npz shards, one per episode) +# +# Output: +# $OUT_DIR/lambda-bundle-.tar.zst +# +# What's IN the bundle: +# - repo/ (sans .git, sans data/, sans artifacts*, sans .venv*) +# - data/processed/ (the four artifacts above) +# - bootstrap.sh (entrypoint that runs ON Lambda) +# - training_manifest.toml (the operator's canonical plan; bootstrap loops over jobs) +# +# What's NOT in the bundle: +# - raw .tar.zst episodes (not needed once tensors are pre-built) +# - .git directory (we ship a code snapshot, not history) +# - prior artifacts/ (Lambda generates fresh) +# +# Run on the Pi: +# bash scripts/build-lambda-bundle.sh +set -euo pipefail + +REPO_ROOT="${REPO_ROOT:-/home/max/.env/CIS490}" +OUT_DIR="${OUT_DIR:-/tmp/cis490-lambda}" +SHORT=$(cd "$REPO_ROOT" && git rev-parse --short HEAD) +BUNDLE="$OUT_DIR/lambda-bundle-$SHORT.tar.zst" + +mkdir -p "$OUT_DIR" + +# Check the four required inputs exist BEFORE we start tarring 5 GB. +required=( + "$REPO_ROOT/data/processed/validation_v1.parquet" + "$REPO_ROOT/data/processed/features_window_v1.parquet" + "$REPO_ROOT/data/processed/feature_schema_v1.json" + "$REPO_ROOT/data/processed/tensor_window_v1" +) +for r in "${required[@]}"; do + if [[ ! -e "$r" ]]; then + echo "missing required input: $r" >&2 + echo "did the Pi-side feature build finish? check data/logs/build_features_full.log" >&2 + exit 1 + fi +done + +# Stage the manifest into the bundle's working dir so bootstrap can read it. +STAGE="$(mktemp -d)" +trap 'rm -rf "$STAGE"' EXIT + +# Pre-built data the Lambda instance needs +mkdir -p "$STAGE/data/processed" +cp "$REPO_ROOT/data/processed/validation_v1.parquet" "$STAGE/data/processed/" +cp "$REPO_ROOT/data/processed/features_window_v1.parquet" "$STAGE/data/processed/" +cp "$REPO_ROOT/data/processed/feature_schema_v1.json" "$STAGE/data/processed/" +cp -r "$REPO_ROOT/data/processed/tensor_window_v1" "$STAGE/data/processed/" + +# Code snapshot — exclude .git, runtime caches, and anything under data/ +mkdir -p "$STAGE/repo" +rsync -a \ + --exclude='.git/' \ + --exclude='.venv*/' \ + --exclude='__pycache__/' \ + --exclude='*.pyc' \ + --exclude='data/' \ + --exclude='artifacts*/' \ + --exclude='reports/eval/' \ + --exclude='reports/pca/' \ + --exclude='reports/xai/' \ + --exclude='reports/fleet-*/' \ + --exclude='/tmp/*' \ + --exclude='vm/images/' \ + --exclude='vm/snapshots/' \ + "$REPO_ROOT/" "$STAGE/repo/" + +# The bootstrap script Lambda runs after extracting the bundle. +cp "$REPO_ROOT/scripts/lambda-bootstrap.sh" "$STAGE/bootstrap.sh" +chmod +x "$STAGE/bootstrap.sh" + +# Use the canonical training manifest as the job list. If the operator +# wants a different plan, they edit etc/training_manifest.toml.example +# and we ship the edited version. +cp "$REPO_ROOT/etc/training_manifest.toml.example" \ + "$STAGE/training_manifest.toml" + +# Manifest pinning — Lambda gets a stamp of what code commit produced +# this bundle, so rerunning against the same data with the same code +# is reproducible. +cat > "$STAGE/BUNDLE_MANIFEST.json" </dev/null)", + "n_tensor_shards": "$(find "$STAGE/data/processed/tensor_window_v1" -name '*.npz' | wc -l | xargs)" +} +EOF + +# tar.zst (zstd > gzip for both speed and ratio on this kind of payload) +echo "compressing bundle to $BUNDLE..." +tar -C "$STAGE" --use-compress-program='zstd -T0 -3' -cf "$BUNDLE" . + +# Stamp the bundle's own sha256 so rsync resume + verify is stable. +sha256sum "$BUNDLE" > "$BUNDLE.sha256" + +# Report +size=$(du -sh "$BUNDLE" | awk '{print $1}') +echo +echo "✓ bundle ready" +echo " $BUNDLE ($size)" +echo " $BUNDLE.sha256" +echo +echo "next: bash scripts/run-on-lambda.sh ubuntu@" diff --git a/scripts/lambda-bootstrap.sh b/scripts/lambda-bootstrap.sh new file mode 100755 index 0000000..3b35fce --- /dev/null +++ b/scripts/lambda-bootstrap.sh @@ -0,0 +1,187 @@ +#!/usr/bin/env bash +# Runs ON the Lambda instance after the bundle is extracted to ~/cis490. +# Installs Python deps, iterates the training manifest, runs each job, +# tars the resulting artifacts so run-on-lambda.sh can rsync them back. +# +# Inputs (cwd = ~/cis490): +# bootstrap.sh ← THIS FILE +# training_manifest.toml ← canonical job list +# BUNDLE_MANIFEST.json ← code commit + sanity stamps +# repo/ ← code snapshot +# data/processed/ ← pre-built parquet + tensor shards +# +# Outputs (cwd = ~/cis490): +# artifacts/ ← _.{ckpt.json,pt,xgb.json} +# reports/eval/ ← per-model train.json + comparison_v2.md +# logs/_.log ← per-job training log (full stdout/stderr) +# +# Idempotency: each iteration checks for an existing +# artifacts/_.ckpt.json before training. Re-running picks +# up where it left off. +set -euo pipefail + +cd "$HOME/cis490" + +echo "=== bundle manifest ===" +cat BUNDLE_MANIFEST.json +echo + +echo "=== gpu inventory ===" +if command -v nvidia-smi >/dev/null 2>&1; then + nvidia-smi -L + nvidia-smi --query-gpu=name,memory.total,memory.free,driver_version --format=csv +else + echo "nvidia-smi not found — running without CUDA?" >&2 +fi +echo + +# ───────────────────────────────────────────────────────────────────── +# 1. Python venv with training deps +# ───────────────────────────────────────────────────────────────────── + +if [[ ! -x .venv/bin/python ]]; then + echo "=== creating .venv ===" + python3 -m venv .venv +fi +. .venv/bin/activate +python -m pip install -q --upgrade pip +echo "=== installing training deps ===" +# CUDA-enabled torch from PyTorch's index. Lambda's A100 supports cu121/cu124; +# default to whichever is the latest stable matching the host driver. +pip install -q torch --index-url https://download.pytorch.org/whl/cu121 +pip install -q xgboost numpy scipy pyarrow polars scikit-learn matplotlib zstandard +pip install -q -e ./repo + +python - <<'PY' +import torch, xgboost +print(f"torch {torch.__version__} cuda? {torch.cuda.is_available()} " + f"device count={torch.cuda.device_count()}") +if torch.cuda.is_available(): + print(f" device 0: {torch.cuda.get_device_name(0)}") +print(f"xgboost {xgboost.__version__}") +PY + +# ───────────────────────────────────────────────────────────────────── +# 2. Iterate the manifest, run trainer per job +# ───────────────────────────────────────────────────────────────────── + +mkdir -p artifacts reports/eval logs +export PYTHONPATH="$PWD/repo" + +# Render manifest jobs to a list ` ` lines (one per job). +mapfile -t JOBS < <(python - <&2; exit 3 +fi + +echo "=== running ${#JOBS[@]} training jobs ===" +declare -i n_done=0 n_skipped=0 n_failed=0 +declare -a FAILED=() + +for entry in "${JOBS[@]}"; do + IFS=$'\t' read -r model mode hyper <<<"$entry" + job_label="${model}_${mode}" + ckpt="artifacts/${job_label}.ckpt.json" + log="logs/${job_label}.log" + + if [[ -f "$ckpt" ]]; then + echo " skip $job_label (already present)" + n_skipped+=1 + continue + fi + + echo + echo "── $job_label ────────────────────────────────────" + started=$(date +%s) + + if [[ "$model" == "transformer_ssl" ]]; then + cmd=(python -m training.trainer.run_ssl + --mode "$mode" + --validation data/processed/validation_v1.parquet + --tensors data/processed/tensor_window_v1 + --out-dir artifacts + --reports-dir reports/eval) + else + cmd=(python -m training.trainer.run + --model "$model" --mode "$mode" + --validation data/processed/validation_v1.parquet + --summary data/processed/features_window_v1.parquet + --tensors data/processed/tensor_window_v1 + --schema data/processed/feature_schema_v1.json + --out-dir artifacts + --reports-dir reports/eval + --train-hosts elliott-thinkpad) + fi + # Tack on hyperparameters from the manifest + if [[ -n "$hyper" ]]; then + # shellcheck disable=SC2206 + extra_args=($hyper) + cmd+=("${extra_args[@]}") + fi + + if (cd repo && "${cmd[@]}") > "$log" 2>&1; then + elapsed=$(( $(date +%s) - started )) + echo " ✓ $job_label done in ${elapsed}s" + n_done+=1 + else + rc=$? + elapsed=$(( $(date +%s) - started )) + echo " ✗ $job_label FAILED (rc=$rc, ${elapsed}s) — last 20 lines of log:" + tail -20 "$log" + FAILED+=("$job_label") + n_failed+=1 + fi +done + +echo +echo "=== training done ===" +echo " done: $n_done" +echo " skipped: $n_skipped" +echo " failed: $n_failed" +if [[ $n_failed -gt 0 ]]; then + echo " failed jobs: ${FAILED[*]}" +fi + +# ───────────────────────────────────────────────────────────────────── +# 3. Eval suite (writes reports/eval/comparison_v2.md + per-model JSON) +# ───────────────────────────────────────────────────────────────────── + +echo +echo "=== eval suite ===" +(cd repo && python -m training.eval_.run \ + --validation data/processed/validation_v1.parquet \ + --artifacts ../artifacts \ + --summary ../data/processed/features_window_v1.parquet \ + --tensors ../data/processed/tensor_window_v1 \ + --reports-dir ../reports/eval) || echo "eval reported errors — see logs/eval.log" + +# ───────────────────────────────────────────────────────────────────── +# 4. Stamp + summarize +# ───────────────────────────────────────────────────────────────────── + +cat > artifacts/RUN_SUMMARY.json < +# +# What this does: +# 1. Verifies the bundle exists (if not: build it first) +# 2. rsync the bundle to ~/cis490-bundle.tar.zst on the Lambda instance +# 3. SSH in, extract, run bootstrap.sh, stream logs back to the Pi +# 4. rsync the resulting artifacts/ + reports/ back to the Pi +# 5. Print a summary; you decide whether to ingest into the local +# trainer-receiver via tools/ingest-lambda-artifacts.py +# +# The script is idempotent on the Lambda side: if you re-run with the +# same bundle, it skips the rsync (sha256 match) and re-runs training +# from where bootstrap left off (each model checks for an existing +# .ckpt.json before retraining). +set -euo pipefail + +REMOTE="${1:-}" +if [[ -z "$REMOTE" ]]; then + echo "usage: $0 ubuntu@" >&2 + exit 1 +fi + +REPO_ROOT="${REPO_ROOT:-/home/max/.env/CIS490}" +OUT_DIR="${OUT_DIR:-/tmp/cis490-lambda}" +SSH_KEY="${SSH_KEY:-$HOME/.ssh/lambda_ed25519}" +SSH_OPTS=(-i "$SSH_KEY" -o StrictHostKeyChecking=accept-new -o ServerAliveInterval=30) + +# Find the latest bundle (most-recently-modified .tar.zst) +BUNDLE=$(ls -t "$OUT_DIR"/lambda-bundle-*.tar.zst 2>/dev/null | head -1 || true) +if [[ -z "$BUNDLE" ]]; then + echo "no bundle found in $OUT_DIR. run scripts/build-lambda-bundle.sh first." >&2 + exit 1 +fi +SHORT=$(basename "$BUNDLE" .tar.zst | sed 's/^lambda-bundle-//') + +echo "=== bundle ===" +ls -lh "$BUNDLE" "$BUNDLE.sha256" 2>/dev/null +echo +echo "=== remote ===" +echo " $REMOTE (key=$SSH_KEY)" +echo + +# Sanity: can we ssh? +if ! ssh "${SSH_OPTS[@]}" -o ConnectTimeout=10 "$REMOTE" 'echo connected' 2>&1; then + echo "ssh to $REMOTE failed. Check the IP, key permissions, and that" >&2 + echo "the instance is fully booted." >&2 + exit 1 +fi + +# Rsync the bundle. -P resumes partial transfers + shows progress. +echo "=== rsync bundle → lambda ===" +rsync -P --partial -e "ssh ${SSH_OPTS[*]}" "$BUNDLE" "$REMOTE:cis490-bundle.tar.zst" +rsync -P --partial -e "ssh ${SSH_OPTS[*]}" "$BUNDLE.sha256" "$REMOTE:cis490-bundle.tar.zst.sha256" +echo + +# Run bootstrap remotely. We pipe stdout/stderr back so the operator +# sees training progress live. +echo "=== running bootstrap.sh on lambda ===" +ssh "${SSH_OPTS[@]}" "$REMOTE" 'bash -s' <<'REMOTE_SCRIPT' +set -euo pipefail +cd "$HOME" + +# Verify the bundle if we have the sha256 alongside it +if [[ -f cis490-bundle.tar.zst.sha256 ]]; then + if ! sha256sum -c cis490-bundle.tar.zst.sha256 >/dev/null 2>&1; then + echo "bundle sha256 mismatch — corrupted rsync? aborting." >&2 + exit 2 + fi +fi + +# Extract into ~/cis490 (delete prior extraction if present) +mkdir -p cis490 +cd cis490 +tar --use-compress-program='zstd -T0' -xf ../cis490-bundle.tar.zst + +# Hand off to the bundle's bootstrap.sh +exec bash bootstrap.sh +REMOTE_SCRIPT + +echo +echo "=== bootstrap returned ok; rsync artifacts back ===" +mkdir -p "$REPO_ROOT/artifacts-lambda" "$REPO_ROOT/reports/lambda" + +rsync -av --partial -e "ssh ${SSH_OPTS[*]}" \ + "$REMOTE:cis490/artifacts/" "$REPO_ROOT/artifacts-lambda/" +rsync -av --partial -e "ssh ${SSH_OPTS[*]}" \ + "$REMOTE:cis490/reports/" "$REPO_ROOT/reports/lambda/" + +echo +echo "✓ artifacts pulled back" +echo " $REPO_ROOT/artifacts-lambda/ ($(du -sh "$REPO_ROOT/artifacts-lambda" | awk '{print $1}'))" +echo " $REPO_ROOT/reports/lambda/ ($(du -sh "$REPO_ROOT/reports/lambda" | awk '{print $1}'))" +echo +echo "next: bash scripts/ingest-lambda-artifacts.sh # uploads each artifact" +echo " # to the local trainer-receiver" +echo " # so cis490-jobs status reflects them" diff --git a/tools/ingest_lambda_artifacts.py b/tools/ingest_lambda_artifacts.py new file mode 100644 index 0000000..966c28b --- /dev/null +++ b/tools/ingest_lambda_artifacts.py @@ -0,0 +1,241 @@ +"""Ingest Lambda-trained artifacts into the local trainer-receiver. + +Lambda finishes a run; run-on-lambda.sh rsyncs artifacts/ + reports/eval/ +back to the Pi at $REPO/artifacts-lambda/ + $REPO/reports/lambda/. +This script bundles each (ckpt.json + sidecar + train.json) trio into a +.tar.zst, computes its sha256, and POSTs it to the trainer-receiver via +PUT /v1/model/{job_id} so the model store on the Pi looks identical to +what a real fleet worker would have produced. + +This preserves the fleet abstraction: Lambda is just an "external worker" +whose output lands in the same /var/lib/cis490/models/ tree. The +operator's `cis490-jobs status` shows them as completed jobs with +proper artifact_ids. + +Usage: + python -m tools.ingest_lambda_artifacts \\ + --artifacts ./artifacts-lambda \\ + --reports ./reports/lambda/eval \\ + --receiver http://127.0.0.1:8445 \\ + --validation data/processed/validation_v1.parquet \\ + --as-host lambda-a100 +""" +from __future__ import annotations + +import argparse +import hashlib +import io +import json +import logging +import re +import sys +import tarfile +from pathlib import Path + +import zstandard as zstd + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from training.fleet.client import FleetClient +from training.fleet.manifest import load as load_manifest + + +log = logging.getLogger("cis490.ingest_lambda") + + +# Map ckpt filename → (model, mode). e.g. "gbt_realistic.ckpt.json" → ("gbt", "realistic"). +_CKPT_RE = re.compile(r"^(?P[a-z_]+)_(?Prealistic|oracle)\.ckpt\.json$") + + +def _pair_files(artifacts_dir: Path, reports_dir: Path + ) -> list[tuple[str, str, Path, Path, Path | None]]: + """Find (model, mode, ckpt.json, sidecar, train.json or None) tuples.""" + out: list[tuple[str, str, Path, Path, Path | None]] = [] + for ckpt in sorted(artifacts_dir.glob("*.ckpt.json")): + m = _CKPT_RE.match(ckpt.name) + if not m: + log.warning("skipping unexpected file: %s", ckpt.name) + continue + model, mode = m.group("model"), m.group("mode") + # Sidecar: .pt for NN, .xgb.json for GBT + for suf in (".pt", ".xgb.json"): + sidecar = ckpt.with_suffix("").with_suffix(suf) + if sidecar.exists(): + break + else: + log.warning("no sidecar (.pt or .xgb.json) found for %s; skipping", + ckpt.name) + continue + # Training report (optional) + train_json: Path | None = None + for cand in ( + reports_dir / f"{model}_{mode}_train.json", + reports_dir / f"{model}_{mode}_pretrain.json", + ): + if cand.exists(): + train_json = cand + break + out.append((model, mode, ckpt, sidecar, train_json)) + return out + + +def _bundle(model: str, mode: str, ckpt: Path, sidecar: Path, + train_json: Path | None) -> tuple[bytes, str]: + """Produce the .tar.zst payload + its sha256.""" + buf = io.BytesIO() + cctx = zstd.ZstdCompressor(level=10) + with cctx.stream_writer(buf) as zw: + with tarfile.open(fileobj=zw, mode="w|") as tar: + tar.add(ckpt, arcname=ckpt.name) + tar.add(sidecar, arcname=sidecar.name) + if train_json is not None: + tar.add(train_json, arcname=train_json.name) + payload = buf.getvalue() + return payload, hashlib.sha256(payload).hexdigest() + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--artifacts", required=True, type=Path, + help="dir holding *.ckpt.json + sidecar files (e.g. " + "./artifacts-lambda)") + ap.add_argument("--reports", required=True, type=Path, + help="dir holding *_train.json + *_pretrain.json") + ap.add_argument("--receiver", default="http://127.0.0.1:8445", + help="trainer-receiver base URL") + ap.add_argument("--manifest", type=Path, + default=Path("etc/training_manifest.toml.example"), + help="canonical manifest — used to map (model, mode) → job_id") + ap.add_argument("--as-host", default="lambda-a100", + help="X-Lab-Host value for the upload (the worker name " + "shown in cis490-jobs)") + ap.add_argument("--operator-token-env", default="CIS490_OPERATOR_TOKEN") + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--log-level", default="INFO") + args = ap.parse_args() + + logging.basicConfig( + level=args.log_level, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + + pairs = _pair_files(args.artifacts, args.reports) + if not pairs: + log.error("no (ckpt.json, sidecar) pairs under %s", args.artifacts) + return 1 + log.info("found %d trained (model, mode) pairs", len(pairs)) + + # Map (model, mode, hyper, …) → job_id by reading the manifest. + man = load_manifest(args.manifest) + job_index: dict[tuple[str, str], str] = {} + for j in man.jobs: + job_index[(j.model, j.mode)] = j.job_id + + import os + operator_token = os.environ.get(args.operator_token_env) + client = FleetClient(args.receiver, host_id=args.as_host, + operator_token=operator_token) + + # Pre-flight: receiver alive? + try: + rc, _ = client._request("GET", "/v1/health") + except Exception as e: + log.error("trainer-receiver not reachable at %s: %s", + args.receiver, e) + return 1 + + n_ok = 0 + n_skipped = 0 + n_failed = 0 + for model, mode, ckpt, sidecar, train_json in pairs: + key = (model, mode) + job_id = job_index.get(key) + if job_id is None: + log.warning("no manifest entry for (%s, %s); skipping", model, mode) + n_skipped += 1 + continue + + # Make sure the queue knows about this job. If it's currently + # completed by an earlier run, requeue first so we can re-upload. + existing = next((r for r in client.list_jobs() + if r.get("job_id") == job_id), None) + if existing is None: + log.warning("queue has no row for job_id=%s — did you " + "`cis490-jobs reload` after editing the manifest?", job_id) + n_skipped += 1 + continue + if existing.get("status") == "completed": + log.info("job %s already completed; requeueing for re-upload", job_id) + if not args.dry_run: + if not client.requeue(job_id): + log.warning("requeue failed for %s", job_id) + n_failed += 1 + continue + # Claim it as if we were a worker + capability = {"cuda_available": True, "cpu_cores": 8, + "ram_available_gib": 32, "cuda_devices": [ + {"name": "Lambda A100", "vram_total_gib": 40, + "vram_free_gib": 35} + ]} + if not args.dry_run: + claim = client.claim(capability) + if not claim or claim.get("job_id") != job_id: + # claim_next picks by priority, not by id, so we may have + # claimed a different job. Manually mark this one in flight. + if claim and claim.get("job_id"): + # Release the unrelated claim by failing it back — + # cleanest way to put it back in pending. + client.fail(claim["job_id"], error="lambda-ingest race; releasing") + # Try once more with a more targeted approach: requeue + claim + client.requeue(job_id) + claim = client.claim(capability) + if not claim or claim.get("job_id") != job_id: + log.warning("could not claim job_id=%s after requeue; " + "skipping", job_id) + n_skipped += 1 + continue + + # Bundle + upload + complete + payload, sha = _bundle(model, mode, ckpt, sidecar, train_json) + log.info("uploading %s_%s job_id=%s sha=%s size=%dKiB", + model, mode, job_id, sha[:12], len(payload) // 1024) + if args.dry_run: + log.info(" (dry-run; skipping HTTP)") + n_ok += 1 + continue + + # PUT /v1/model/{job_id} + code, body = client._request( + "PUT", f"/v1/model/{job_id}", + body=payload, + extra_headers={ + "x-content-sha256": sha, + "content-length": str(len(payload)), + "content-type": "application/octet-stream", + }, + expect_status=(200, 201), + ) + if code not in (200, 201): + log.error("upload failed: code=%d body=%r", code, body) + n_failed += 1 + continue + artifact_id = (body or {}).get("artifact_id", sha) + + # Mark complete + if not client.complete(job_id, artifact_id=artifact_id): + log.warning("complete() returned false for %s", job_id) + n_failed += 1 + continue + n_ok += 1 + + print() + print(f" uploaded: {n_ok}") + print(f" skipped: {n_skipped}") + print(f" failed: {n_failed}") + print() + print("status: cis490-jobs status") + print("models: ls /var/lib/cis490/models/") + return 0 if n_failed == 0 else 2 + + +if __name__ == "__main__": + raise SystemExit(main())