§4.2 calls for target VMs we BUILD, not VMs we fetch. §4.13 demands
every target ship the same isolation posture (no upstream egress, no
host-shared FS, unprivileged QEMU, fresh snapshot per episode). This
commit lands the infrastructure for both.
New surface:
* orchestrator/target_spec.py
Loads + validates `vm/targets/<name>/spec.toml`. Containment
fields are not knobs — each has exactly ONE safe value, and a
spec asserting the unsafe value is rejected at load time. There's
no `--containment-override`; weakening §4.13 requires amending
PIPELINE.md and operator sign-off.
* tools/build_target.py
Orchestrates build → verify → publish for a single target. Spec
invalid → exit 78 (sysadmin error). build.sh failure → image not
published. verify.sh failure → image discarded; that's the §4.2
acceptance gate. Publishes sha256 + the manifest.toml stanza the
operator copies in to admit the image (§16 substantive amendment
with sign-off per §15).
* vm/targets/<name>/{spec.toml,build.sh,verify.sh}
Template structure. spec.toml is the contract; build.sh produces
$OUT_PATH; verify.sh boots the produced image under the §4.13
containment posture and asserts every promise.
* vm/targets/shellshock/
First real working target. CVE-2014-6271 (Apache mod_cgi + bash
4.2 mis-parsing function-export environment values). Replaces
the SourceForge Metasploitable2 path that §3 evidence proved
unverifiable. Bash 4.2 is built from sha256-pinned GNU source
inside an Alpine 3.21 cloudinit guest; the build script asserts
the produced bash actually triggers shellshock; the verifier
re-asserts it under restrict=on with a real CVE-2014-6271 probe.
* vm/targets/README.md
How operators add a target. Walks the spec → build → verify →
manifest amendment loop.
Containment regression tests (tests/test_containment.py) — 20 new
assertions, parameterized over every target with a build/verify trio:
* verify.sh MUST contain `restrict=on` on its netdev (§4.13)
* verify.sh MUST contain `snapshot=on` on the boot drive (§4.13)
* verify.sh + build.sh MUST NOT contain -virtfs / -fsdev / 9pfs
* verify.sh + build.sh MUST NOT wrap qemu-system in `sudo`
* Every target must ship the complete spec.toml + build.sh + verify.sh
trio — no half-built targets (§1 default-to-removal)
Spec validation tests (tests/test_target_spec.py): 13 new tests over
spec parse, name/dir mismatch, missing fields, out-of-range port, and
the §4.13 containment field validators (each unsafe value rejected
with a clear error).
The shellshock target's image is NOT yet published to manifest.toml's
[[targets.images]] — that's the §15 sign-off amendment that lands
after a successful operator-driven build_target.py run on a lab host
with KVM. Building takes ~10 min on x86_64; cannot run on the Pi
under TCG. Operator drives the first build, verifies the sha256, then
amends manifest.toml in a follow-up commit.
261 tests passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
263 lines
10 KiB
Python
263 lines
10 KiB
Python
"""Target VM spec loader + validator (PIPELINE.md §4.2 / §4.13).
|
|
|
|
Every target VM image in `[targets]` of the canonical manifest is
|
|
described by a `vm/targets/<name>/spec.toml` file. The spec captures:
|
|
|
|
* What the target promises — vulnerable service, port, version, CVE
|
|
that the build script must produce a working instance of.
|
|
* Containment posture (§4.13) — every target must declare itself
|
|
isolated to the same standard, and a regression in any of these
|
|
fields is a containment regression that the verifier rejects
|
|
regardless of any "experimental realism" the change claims to add.
|
|
|
|
Build flow:
|
|
1. tools/build_target.py <name> — runs vm/targets/<name>/build.sh,
|
|
produces <name>.qcow2 with sha256.
|
|
2. tools/verify_target.py <name> — boots the freshly-built image in
|
|
a containment-correct QEMU
|
|
configuration, asserts every
|
|
promise in spec.toml.
|
|
|
|
A spec is INVALID if any §4.13 containment field is absent or set to
|
|
the unsafe value. There is no "I know what I'm doing" override —
|
|
weakening containment requires amending PIPELINE.md §4.13 and getting
|
|
operator sign-off (§15, §16), not toggling a TOML key.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tomllib
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
|
|
class TargetSpecError(ValueError):
|
|
"""Raised when a target spec is missing, unreadable, or fails
|
|
validation. Build/verify scripts translate this into exit 78."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Promises:
|
|
"""What the build script must produce in the target VM. The
|
|
verifier asserts every field is observably true after a clean
|
|
boot of the produced image."""
|
|
cve: str
|
|
service_name: str
|
|
service_port: int
|
|
service_proto: str # "tcp" | "udp"
|
|
vulnerable_software: str
|
|
vulnerable_version: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Containment:
|
|
"""§4.13 isolation posture. Every field is required and every
|
|
field has a single safe value — there's no "production vs dev"
|
|
knob. A target spec asserting unsafe containment is rejected
|
|
at load time."""
|
|
upstream_egress: bool # MUST be False
|
|
shared_filesystem: bool # MUST be False
|
|
unprivileged_qemu: bool # MUST be True
|
|
fresh_snapshot_per_episode: bool # MUST be True
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TargetSpec:
|
|
name: str
|
|
description: str
|
|
base_image: str # e.g. "alpine-3.21-virt"; build.sh handles fetch
|
|
promises: Promises
|
|
containment: Containment
|
|
spec_path: Path
|
|
|
|
def to_meta(self) -> dict:
|
|
"""Serialize for embedding in `meta.json` so episodes carry
|
|
target provenance (§4.2 acceptance + §10 ground truth)."""
|
|
return {
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"base_image": self.base_image,
|
|
"promises": {
|
|
"cve": self.promises.cve,
|
|
"service_name": self.promises.service_name,
|
|
"service_port": self.promises.service_port,
|
|
"service_proto": self.promises.service_proto,
|
|
"vulnerable_software": self.promises.vulnerable_software,
|
|
"vulnerable_version": self.promises.vulnerable_version,
|
|
},
|
|
"containment": {
|
|
"upstream_egress": self.containment.upstream_egress,
|
|
"shared_filesystem": self.containment.shared_filesystem,
|
|
"unprivileged_qemu": self.containment.unprivileged_qemu,
|
|
"fresh_snapshot_per_episode":
|
|
self.containment.fresh_snapshot_per_episode,
|
|
},
|
|
}
|
|
|
|
|
|
def load_target_spec(repo_root: Path | str, name: str) -> TargetSpec:
|
|
"""Load + validate `<repo_root>/vm/targets/<name>/spec.toml`.
|
|
Raises TargetSpecError on any failure."""
|
|
repo_root = Path(repo_root).resolve()
|
|
spec_path = repo_root / "vm" / "targets" / name / "spec.toml"
|
|
if not spec_path.exists():
|
|
raise TargetSpecError(
|
|
f"target spec not found at {spec_path}. "
|
|
f"Every target referenced from manifest.targets must have a "
|
|
f"spec.toml under vm/targets/<name>/ per §4.2."
|
|
)
|
|
try:
|
|
raw = tomllib.loads(spec_path.read_text())
|
|
except (OSError, tomllib.TOMLDecodeError) as e:
|
|
raise TargetSpecError(f"cannot parse {spec_path}: {e}") from e
|
|
|
|
return _validate(raw, spec_path, expected_name=name)
|
|
|
|
|
|
def list_target_specs(repo_root: Path | str) -> list[TargetSpec]:
|
|
"""Discover every target spec under vm/targets/. Used by
|
|
build_target.py when invoked without a name to enumerate options,
|
|
and by tests to assert every spec on disk validates cleanly."""
|
|
repo_root = Path(repo_root).resolve()
|
|
targets_dir = repo_root / "vm" / "targets"
|
|
if not targets_dir.exists():
|
|
return []
|
|
specs: list[TargetSpec] = []
|
|
for child in sorted(targets_dir.iterdir()):
|
|
if not child.is_dir():
|
|
continue
|
|
spec_file = child / "spec.toml"
|
|
if not spec_file.exists():
|
|
continue
|
|
specs.append(load_target_spec(repo_root, child.name))
|
|
return specs
|
|
|
|
|
|
# ---------- validation -----------------------------------------------
|
|
|
|
|
|
def _validate(raw: dict, spec_path: Path, *, expected_name: str) -> TargetSpec:
|
|
name = _require_str(raw, "name")
|
|
if name != expected_name:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: spec.name={name!r} doesn't match directory name "
|
|
f"{expected_name!r} — keep them in sync"
|
|
)
|
|
description = _require_str(raw, "description")
|
|
base_image = _require_str(raw, "base_image")
|
|
|
|
promises_block = _require_dict(raw, "promises")
|
|
promises = Promises(
|
|
cve=_require_str(promises_block, "cve", ctx="promises"),
|
|
service_name=_require_str(promises_block, "service_name", ctx="promises"),
|
|
service_port=_require_int(promises_block, "service_port", ctx="promises"),
|
|
service_proto=_require_str(promises_block, "service_proto", ctx="promises"),
|
|
vulnerable_software=_require_str(
|
|
promises_block, "vulnerable_software", ctx="promises"),
|
|
vulnerable_version=_require_str(
|
|
promises_block, "vulnerable_version", ctx="promises"),
|
|
)
|
|
if promises.service_proto not in ("tcp", "udp"):
|
|
raise TargetSpecError(
|
|
f"{spec_path}: promises.service_proto must be 'tcp' or 'udp', "
|
|
f"got {promises.service_proto!r}"
|
|
)
|
|
if not 1 <= promises.service_port <= 65535:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: promises.service_port out of range: "
|
|
f"{promises.service_port}"
|
|
)
|
|
|
|
containment_block = _require_dict(raw, "containment")
|
|
containment = Containment(
|
|
upstream_egress=_require_bool(
|
|
containment_block, "upstream_egress", ctx="containment"),
|
|
shared_filesystem=_require_bool(
|
|
containment_block, "shared_filesystem", ctx="containment"),
|
|
unprivileged_qemu=_require_bool(
|
|
containment_block, "unprivileged_qemu", ctx="containment"),
|
|
fresh_snapshot_per_episode=_require_bool(
|
|
containment_block, "fresh_snapshot_per_episode", ctx="containment"),
|
|
)
|
|
# Hard-enforce the §4.13 stance. Each field has exactly one safe
|
|
# value; the spec is a declaration that the target satisfies it,
|
|
# not a knob. A spec asserting an unsafe value is rejected here so
|
|
# it never reaches the build pipeline.
|
|
if containment.upstream_egress is not False:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: containment.upstream_egress must be false (§4.13). "
|
|
f"Targets with internet routing are containment regressions."
|
|
)
|
|
if containment.shared_filesystem is not False:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: containment.shared_filesystem must be false (§4.13). "
|
|
f"Targets with host-shared mounts are containment regressions."
|
|
)
|
|
if containment.unprivileged_qemu is not True:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: containment.unprivileged_qemu must be true (§4.13). "
|
|
f"Privileged QEMU is a containment regression."
|
|
)
|
|
if containment.fresh_snapshot_per_episode is not True:
|
|
raise TargetSpecError(
|
|
f"{spec_path}: containment.fresh_snapshot_per_episode must be "
|
|
f"true (§4.13). State carrying across episodes poisons the dataset."
|
|
)
|
|
|
|
return TargetSpec(
|
|
name=name,
|
|
description=description,
|
|
base_image=base_image,
|
|
promises=promises,
|
|
containment=containment,
|
|
spec_path=spec_path,
|
|
)
|
|
|
|
|
|
# ---------- helpers --------------------------------------------------
|
|
|
|
|
|
def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object:
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise TargetSpecError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if not isinstance(v, kind):
|
|
raise TargetSpecError(
|
|
f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}"
|
|
)
|
|
return v
|
|
|
|
|
|
def _require_str(d: dict, key: str, *, ctx: str = "") -> str:
|
|
return _require(d, key, str, ctx=ctx) # type: ignore[return-value]
|
|
|
|
|
|
def _require_int(d: dict, key: str, *, ctx: str = "") -> int:
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise TargetSpecError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if isinstance(v, bool):
|
|
raise TargetSpecError(f"field {where}{key} must be int, got bool")
|
|
if isinstance(v, int):
|
|
return v
|
|
raise TargetSpecError(
|
|
f"field {where}{key} must be int, got {type(v).__name__}"
|
|
)
|
|
|
|
|
|
def _require_bool(d: dict, key: str, *, ctx: str = "") -> bool:
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise TargetSpecError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if not isinstance(v, bool):
|
|
raise TargetSpecError(
|
|
f"field {where}{key} must be bool, got {type(v).__name__}"
|
|
)
|
|
return v
|
|
|
|
|
|
def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict:
|
|
return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]
|