"""Build a target VM from its declarative spec (PIPELINE.md §4.2). Usage: python tools/build_target.py [--out DIR] python tools/build_target.py --list Each target lives at `vm/targets//` with three files: * spec.toml — what the target promises (orchestrator/target_spec.py) * build.sh — declarative build steps producing .qcow2 * verify.sh — boots the produced image, asserts every promise Build flow: 1. Load + validate the spec (containment posture pre-checked). 2. Run build.sh with OUT_PATH set to the staged artifact. 3. Run verify.sh against the staged artifact in a containment-correct QEMU configuration. Any verification failure is fatal — the image does NOT enter the published images dir. 4. Compute sha256 and rename to the published path. 5. Print the sha256 — operator copies it into manifest.toml's [[targets.images]] entry to admit the image (§4.2 acceptance). Failure modes: * Spec invalid → exit 78 * build.sh non-zero → exit 1, image not published * verify.sh non-zero → exit 1, image not published * sha256 doesn't match recorded → exit 1 """ from __future__ import annotations import argparse import hashlib import logging import os import shutil import subprocess import sys from pathlib import Path # Allow running as a script. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from orchestrator.target_spec import ( # noqa: E402 TargetSpecError, list_target_specs, load_target_spec, ) EXIT_SYSADMIN_ERROR = 78 DEFAULT_OUT_DIR = Path("/var/lib/cis490/vm/images") def _sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def build_one(repo_root: Path, name: str, out_dir: Path, log: logging.Logger) -> int: """Build + verify a single target. Returns exit code.""" try: spec = load_target_spec(repo_root, name) except TargetSpecError as e: log.error("%s: spec invalid: %s", name, e) return EXIT_SYSADMIN_ERROR target_dir = repo_root / "vm" / "targets" / name build_script = target_dir / "build.sh" verify_script = target_dir / "verify.sh" if not build_script.exists(): log.error("%s: build.sh missing at %s", name, build_script) return EXIT_SYSADMIN_ERROR if not verify_script.exists(): log.error("%s: verify.sh missing at %s", name, verify_script) return EXIT_SYSADMIN_ERROR out_dir.mkdir(parents=True, exist_ok=True) staging = out_dir / f"{name}.qcow2.staging" final = out_dir / f"{name}.qcow2" # Always start from a clean staging path; partial builds are not # quietly resumed — the build script is idempotent enough that # re-running is cheap, and resuming a partial qcow2 silently # corrupts artifacts (§7.1 compensating layer). if staging.exists(): staging.unlink() log.info("[%s] building → %s", name, staging) env = os.environ.copy() env["OUT_PATH"] = str(staging) env["BASE_IMAGE_NAME"] = spec.base_image rc = subprocess.run( [str(build_script)], cwd=str(target_dir), env=env, check=False, ).returncode if rc != 0: log.error("[%s] build.sh exited %d; not publishing", name, rc) if staging.exists(): staging.unlink() return 1 if not staging.exists(): log.error("[%s] build.sh succeeded but no artifact at %s", name, staging) return 1 log.info("[%s] verifying", name) env_v = os.environ.copy() env_v["IMAGE_PATH"] = str(staging) env_v["EXPECTED_SERVICE_NAME"] = spec.promises.service_name env_v["EXPECTED_SERVICE_PORT"] = str(spec.promises.service_port) env_v["EXPECTED_SERVICE_PROTO"] = spec.promises.service_proto env_v["EXPECTED_VULN_SOFTWARE"] = spec.promises.vulnerable_software env_v["EXPECTED_VULN_VERSION"] = spec.promises.vulnerable_version rc = subprocess.run( [str(verify_script)], cwd=str(target_dir), env=env_v, check=False, ).returncode if rc != 0: log.error( "[%s] verify.sh exited %d — image does NOT meet its spec; " "discarding %s", name, rc, staging, ) staging.unlink() return 1 digest = _sha256(staging) log.info("[%s] verified; sha256=%s", name, digest) if final.exists(): final.unlink() shutil.move(str(staging), str(final)) print(f"\n target: {name}") print(f" image: {final}") print(f" sha256: {digest}") print(f"\nAdmit by adding to manifest.toml [[targets.images]]:") print(f" image_name = \"{name}\"") print(f" sha256 = \"{digest}\"") print(f" build_script = \"vm/targets/{name}/build.sh\"") return 0 def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-build-target") p.add_argument("name", nargs="?", help="Target name (matches vm/targets//)") p.add_argument("--list", action="store_true", help="List discoverable target specs and exit") p.add_argument( "--out", type=Path, default=DEFAULT_OUT_DIR, help=f"Where to publish verified images (default: {DEFAULT_OUT_DIR})", ) p.add_argument("--log-level", default="INFO") args = p.parse_args(argv) logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger("cis490.build-target") repo_root = Path(__file__).resolve().parent.parent if args.list: specs = list_target_specs(repo_root) if not specs: print("no target specs under vm/targets/") return 0 for s in specs: print(f" {s.name:30} {s.promises.cve:20} " f"{s.promises.service_name}:{s.promises.service_port} " f"({s.promises.vulnerable_software} " f"{s.promises.vulnerable_version})") return 0 if not args.name: p.error("name required (or pass --list)") return 2 return build_one(repo_root, args.name, args.out, log) if __name__ == "__main__": sys.exit(main())