"""Target VM spec loader + validator (PIPELINE.md §4.2 / §4.13). Every target VM image in `[targets]` of the canonical manifest is described by a `vm/targets//spec.toml` file. The spec captures: * What the target promises — vulnerable service, port, version, CVE that the build script must produce a working instance of. * Containment posture (§4.13) — every target must declare itself isolated to the same standard, and a regression in any of these fields is a containment regression that the verifier rejects regardless of any "experimental realism" the change claims to add. Build flow: 1. tools/build_target.py — runs vm/targets//build.sh, produces .qcow2 with sha256. 2. tools/verify_target.py — boots the freshly-built image in a containment-correct QEMU configuration, asserts every promise in spec.toml. A spec is INVALID if any §4.13 containment field is absent or set to the unsafe value. There is no "I know what I'm doing" override — weakening containment requires amending PIPELINE.md §4.13 and getting operator sign-off (§15, §16), not toggling a TOML key. """ from __future__ import annotations import tomllib from dataclasses import dataclass from pathlib import Path class TargetSpecError(ValueError): """Raised when a target spec is missing, unreadable, or fails validation. Build/verify scripts translate this into exit 78.""" @dataclass(frozen=True) class Promises: """What the build script must produce in the target VM. The verifier asserts every field is observably true after a clean boot of the produced image.""" cve: str service_name: str service_port: int service_proto: str # "tcp" | "udp" vulnerable_software: str vulnerable_version: str @dataclass(frozen=True) class Containment: """§4.13 isolation posture. Every field is required and every field has a single safe value — there's no "production vs dev" knob. A target spec asserting unsafe containment is rejected at load time.""" upstream_egress: bool # MUST be False shared_filesystem: bool # MUST be False unprivileged_qemu: bool # MUST be True fresh_snapshot_per_episode: bool # MUST be True @dataclass(frozen=True) class TargetSpec: name: str description: str base_image: str # e.g. "alpine-3.21-virt"; build.sh handles fetch promises: Promises containment: Containment spec_path: Path def to_meta(self) -> dict: """Serialize for embedding in `meta.json` so episodes carry target provenance (§4.2 acceptance + §10 ground truth).""" return { "name": self.name, "description": self.description, "base_image": self.base_image, "promises": { "cve": self.promises.cve, "service_name": self.promises.service_name, "service_port": self.promises.service_port, "service_proto": self.promises.service_proto, "vulnerable_software": self.promises.vulnerable_software, "vulnerable_version": self.promises.vulnerable_version, }, "containment": { "upstream_egress": self.containment.upstream_egress, "shared_filesystem": self.containment.shared_filesystem, "unprivileged_qemu": self.containment.unprivileged_qemu, "fresh_snapshot_per_episode": self.containment.fresh_snapshot_per_episode, }, } def load_target_spec(repo_root: Path | str, name: str) -> TargetSpec: """Load + validate `/vm/targets//spec.toml`. Raises TargetSpecError on any failure.""" repo_root = Path(repo_root).resolve() spec_path = repo_root / "vm" / "targets" / name / "spec.toml" if not spec_path.exists(): raise TargetSpecError( f"target spec not found at {spec_path}. " f"Every target referenced from manifest.targets must have a " f"spec.toml under vm/targets// per §4.2." ) try: raw = tomllib.loads(spec_path.read_text()) except (OSError, tomllib.TOMLDecodeError) as e: raise TargetSpecError(f"cannot parse {spec_path}: {e}") from e return _validate(raw, spec_path, expected_name=name) def list_target_specs(repo_root: Path | str) -> list[TargetSpec]: """Discover every target spec under vm/targets/. Used by build_target.py when invoked without a name to enumerate options, and by tests to assert every spec on disk validates cleanly.""" repo_root = Path(repo_root).resolve() targets_dir = repo_root / "vm" / "targets" if not targets_dir.exists(): return [] specs: list[TargetSpec] = [] for child in sorted(targets_dir.iterdir()): if not child.is_dir(): continue spec_file = child / "spec.toml" if not spec_file.exists(): continue specs.append(load_target_spec(repo_root, child.name)) return specs # ---------- validation ----------------------------------------------- def _validate(raw: dict, spec_path: Path, *, expected_name: str) -> TargetSpec: name = _require_str(raw, "name") if name != expected_name: raise TargetSpecError( f"{spec_path}: spec.name={name!r} doesn't match directory name " f"{expected_name!r} — keep them in sync" ) description = _require_str(raw, "description") base_image = _require_str(raw, "base_image") promises_block = _require_dict(raw, "promises") promises = Promises( cve=_require_str(promises_block, "cve", ctx="promises"), service_name=_require_str(promises_block, "service_name", ctx="promises"), service_port=_require_int(promises_block, "service_port", ctx="promises"), service_proto=_require_str(promises_block, "service_proto", ctx="promises"), vulnerable_software=_require_str( promises_block, "vulnerable_software", ctx="promises"), vulnerable_version=_require_str( promises_block, "vulnerable_version", ctx="promises"), ) if promises.service_proto not in ("tcp", "udp"): raise TargetSpecError( f"{spec_path}: promises.service_proto must be 'tcp' or 'udp', " f"got {promises.service_proto!r}" ) if not 1 <= promises.service_port <= 65535: raise TargetSpecError( f"{spec_path}: promises.service_port out of range: " f"{promises.service_port}" ) containment_block = _require_dict(raw, "containment") containment = Containment( upstream_egress=_require_bool( containment_block, "upstream_egress", ctx="containment"), shared_filesystem=_require_bool( containment_block, "shared_filesystem", ctx="containment"), unprivileged_qemu=_require_bool( containment_block, "unprivileged_qemu", ctx="containment"), fresh_snapshot_per_episode=_require_bool( containment_block, "fresh_snapshot_per_episode", ctx="containment"), ) # Hard-enforce the §4.13 stance. Each field has exactly one safe # value; the spec is a declaration that the target satisfies it, # not a knob. A spec asserting an unsafe value is rejected here so # it never reaches the build pipeline. if containment.upstream_egress is not False: raise TargetSpecError( f"{spec_path}: containment.upstream_egress must be false (§4.13). " f"Targets with internet routing are containment regressions." ) if containment.shared_filesystem is not False: raise TargetSpecError( f"{spec_path}: containment.shared_filesystem must be false (§4.13). " f"Targets with host-shared mounts are containment regressions." ) if containment.unprivileged_qemu is not True: raise TargetSpecError( f"{spec_path}: containment.unprivileged_qemu must be true (§4.13). " f"Privileged QEMU is a containment regression." ) if containment.fresh_snapshot_per_episode is not True: raise TargetSpecError( f"{spec_path}: containment.fresh_snapshot_per_episode must be " f"true (§4.13). State carrying across episodes poisons the dataset." ) return TargetSpec( name=name, description=description, base_image=base_image, promises=promises, containment=containment, spec_path=spec_path, ) # ---------- helpers -------------------------------------------------- def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object: where = f"{ctx}." if ctx else "" if key not in d: raise TargetSpecError(f"missing required field {where}{key}") v = d[key] if not isinstance(v, kind): raise TargetSpecError( f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}" ) return v def _require_str(d: dict, key: str, *, ctx: str = "") -> str: return _require(d, key, str, ctx=ctx) # type: ignore[return-value] def _require_int(d: dict, key: str, *, ctx: str = "") -> int: where = f"{ctx}." if ctx else "" if key not in d: raise TargetSpecError(f"missing required field {where}{key}") v = d[key] if isinstance(v, bool): raise TargetSpecError(f"field {where}{key} must be int, got bool") if isinstance(v, int): return v raise TargetSpecError( f"field {where}{key} must be int, got {type(v).__name__}" ) def _require_bool(d: dict, key: str, *, ctx: str = "") -> bool: where = f"{ctx}." if ctx else "" if key not in d: raise TargetSpecError(f"missing required field {where}{key}") v = d[key] if not isinstance(v, bool): raise TargetSpecError( f"field {where}{key} must be bool, got {type(v).__name__}" ) return v def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict: return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]