PIPELINE §5 step 4: catalog admission verifier (§4.3)

tools/verify_catalog.py runs the §4.3 end-to-end verification flow
against every entry in manifest.toml's [catalog].modules (or a single
named module). The flow follows §4.3 exactly:

  1. Load the module config + the verified-against target spec.
  2. Resolve the published image path; fail loudly if absent.
  3. Boot the target VM under §4.13 containment (restrict=on, snapshot=on,
     no shared FS, unprivileged QEMU — same posture as verify.sh).
  4. Wait for the service on the spec'd port.
  5. Login to msfrpcd, snapshot the existing session set, fire the
     module against `127.0.0.1:<host_port>` (the SLIRP hostfwd to the
     guest's promised service port).
  6. Wait for `session_open` — NOT session_open_timeout, which is the
     §4.5 failed-label outcome.
  7. Round-trip a shell command (`id`); confirm uid= shape.
  8. Confirm a guest-side artifact (touch marker; ls + echo VERIFY_OK).

Per-module exit code is 0 only when EVERY step passes. CLI exit is 0
only when EVERY requested module passes — partial credit isn't an
option (§1 default-to-removal: a module that can't pass shouldn't be
in the catalog).

Structured JSON output with per-step timings + detail strings, written
to stdout or --out <path>. Operator pulls this into a successful CI
run + signs off on the manifest.toml [[catalog.modules]] amendment
with a fresh `last_verified = <commit_sha>` per §15.

Tests (tests/test_verify_catalog.py, 8 cases): exercise the flow with
a mocked MSFRpcClient + mocked qemu boot. Cover happy path, every
short-circuit failure mode (image missing, service never up, session
timeout, shell round-trip wrong, guest artifact missing), and
spec-load errors. Real verification needs lab hardware; the mocked
flow proves the orchestration contract.

269 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Max Gorog 2026-05-04 01:35:32 -05:00
parent 4d29b7236d
commit 22269e175d
2 changed files with 813 additions and 0 deletions

View file

@ -0,0 +1,337 @@
"""Tests for tools/verify_catalog.py — the §4.3 catalog admission
verifier.
Real verifications boot a VM and talk to msfrpcd. These tests mock
both so the §4.3 flow can be exercised in CI without lab hardware.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
# Load the verifier as a module (it's a script). Register in
# sys.modules so dataclass decorators inside it can resolve back via
# cls.__module__.
spec = importlib.util.spec_from_file_location(
"verify_catalog", REPO_ROOT / "tools" / "verify_catalog.py",
)
verify_catalog = importlib.util.module_from_spec(spec)
sys.modules["verify_catalog"] = verify_catalog
spec.loader.exec_module(verify_catalog)
# ---------------------------------------------------------------------
# Test fixtures: a synthetic target spec on disk + a mock msfrpc client
# ---------------------------------------------------------------------
VALID_TARGET_SPEC = """
name = "fixture-target"
description = "fixture for verifier tests"
base_image = "alpine-3.21-virt"
[promises]
cve = "CVE-2014-6271"
service_name = "apache"
service_port = 80
service_proto = "tcp"
vulnerable_software = "bash"
vulnerable_version = "4.2"
[containment]
upstream_egress = false
shared_filesystem = false
unprivileged_qemu = true
fresh_snapshot_per_episode = true
"""
VALID_MODULE_TOML = """
description = "fixture module"
[module]
type = "exploit"
path = "multi/test/fixture"
[module.options]
RHOSTS = "{{ target_ip }}"
RPORT = 80
[payload]
path = "cmd/unix/bind_perl"
[payload.options]
LPORT = 4444
[session]
type = "shell"
[runtime]
requires_bridge = false
"""
def _stage_repo(tmp_path: Path) -> Path:
"""Build a minimal repo skeleton with the fixture target + module."""
target_dir = tmp_path / "vm" / "targets" / "fixture-target"
target_dir.mkdir(parents=True)
(target_dir / "spec.toml").write_text(VALID_TARGET_SPEC)
modules_dir = tmp_path / "exploits" / "modules"
modules_dir.mkdir(parents=True)
(modules_dir / "fixture-module.toml").write_text(VALID_MODULE_TOML)
images_dir = tmp_path / "images"
images_dir.mkdir()
(images_dir / "fixture-target.qcow2").write_bytes(b"fake qcow2 bytes")
return tmp_path
class MockMSFClient:
"""Stand-in for MSFRpcClient that lets each test script the
response shape what session opens (or doesn't), what shell
commands return."""
def __init__(
self,
*,
sessions_at_arm: dict | None = None,
sessions_after_fire: dict | None = None,
shell_responses: dict[int, list[str]] | None = None,
) -> None:
self._before = sessions_at_arm or {}
self._after = sessions_after_fire or {}
self._fired = False
self._shell_buffers = {
sid: list(resps) for sid, resps in (shell_responses or {}).items()
}
def login(self) -> None:
pass
def logout(self) -> None:
pass
def session_list(self) -> dict:
return self._after if self._fired else self._before
def module_execute(self, module_type, module_path, options):
self._fired = True
return {"job_id": 1, "uuid": "fake"}
def session_shell_write(self, sid, data):
return {}
def session_shell_read(self, sid):
if sid in self._shell_buffers and self._shell_buffers[sid]:
return self._shell_buffers[sid].pop(0)
return ""
def session_stop(self, sid):
return {}
class MockBoot:
pid = 12345
def terminate(self):
pass
def _mock_boot_fn_factory(host_port: int = 12345):
boot = MockBoot()
boot.host_port = host_port
def _boot(image_path, spec, run_dir, log):
# Mark the run_dir so we can assert it was created.
run_dir.mkdir(parents=True, exist_ok=True)
return boot
return _boot, boot
# ---------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------
def test_happy_path_passes_every_step(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, boot = _mock_boot_fn_factory()
# Force the TCP probe to "succeed" by replacing _wait_for_tcp.
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
msf = MockMSFClient(
sessions_at_arm={},
sessions_after_fire={42: {"type": "shell"}},
shell_responses={
42: [
"uid=0(root) gid=0(root)\n", # `id`
"/tmp/cis490_verify_marker_xxx\nVERIFY_OK\n", # ls + echo
],
},
)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert result.overall_passed, result.to_dict()
step_names = [s.name for s in result.steps]
# All §4.3 steps present + passed.
assert "load_specs" in step_names
assert "image_present" in step_names
assert "boot_target" in step_names
assert "service_up" in step_names
assert "module_fire" in step_names
assert "session_open" in step_names
assert "shell_roundtrip" in step_names
assert "guest_artifact" in step_names
assert all(s.passed for s in result.steps)
def test_missing_module_fails_at_load(tmp_path):
repo = _stage_repo(tmp_path)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="no-such-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert "module config load failed" in result.error
def test_missing_target_spec_fails_at_load(tmp_path):
repo = _stage_repo(tmp_path)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="no-such-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert "target spec load failed" in result.error
def test_missing_image_fails_loudly(tmp_path):
repo = _stage_repo(tmp_path)
(repo / "images" / "fixture-target.qcow2").unlink() # drop the image
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert result.error == "image missing"
image_step = next(s for s in result.steps if s.name == "image_present")
assert not image_step.passed
def test_session_open_timeout_recorded_as_failure(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
# session.list never sees a new entry.
msf = MockMSFClient(sessions_at_arm={}, sessions_after_fire={})
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
session_timeout_s=0.5,
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert result.error == "session_open_timeout"
session_step = next(s for s in result.steps if s.name == "session_open")
assert not session_step.passed
def test_shell_roundtrip_failure_short_circuits_artifact_check(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
msf = MockMSFClient(
sessions_at_arm={},
sessions_after_fire={1: {"type": "shell"}},
shell_responses={1: ["garbage no uid here\n"]},
)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "shell round-trip" in result.error
# We never reached the guest_artifact step.
assert not any(s.name == "guest_artifact" for s in result.steps)
def test_service_never_comes_up_fails(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: False)
msf = MockMSFClient()
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_timeout_s=0.5,
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "service never came up" in result.error
def test_boot_failure_propagates(tmp_path):
repo = _stage_repo(tmp_path)
def explode(*a, **kw):
raise RuntimeError("qemu died")
msf = MockMSFClient()
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=explode,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "qemu died" in result.error
boot_step = next(s for s in result.steps if s.name == "boot_target")
assert not boot_step.passed

476
tools/verify_catalog.py Executable file
View file

@ -0,0 +1,476 @@
"""Catalog admission verifier (PIPELINE.md §4.3).
Re-runs the full end-to-end verification flow for every catalog entry
in manifest.toml's `[catalog].modules`, or for a single module when
named on the CLI:
1. Boot the verified-against target VM under §4.13 containment.
2. Wait for the target's promised service to come up.
3. Connect to msfrpcd and fire the module.
4. Observe `session_open` event within timeout (NOT
`session_open_timeout` that's §4.5's failed label).
5. Round-trip a shell command (`id`); confirm response shape.
6. Confirm a guest-side artifact (touch marker; ls).
7. Tear the target down (snapshot revert via QMP).
Failures:
* Module-config or target-spec load exit 78 (sysadmin error)
* Image missing or sha256 mismatch exit 1, module FAILS verification
* Service didn't come up → exit 1, module FAILS verification
* No session_open within timeout exit 1, module FAILS verification
* Round-trip / artifact failure exit 1, module FAILS verification
Exit 0 ONLY when every requested module passes every step. Any module
that fails should be REMOVED from the manifest catalog (§4.3 +
§1 default-to-removal) there's no "partial credit" admission.
This script is the gate: a CI run that goes green produces an artifact
the operator can then sign off on for amending manifest.toml's
[[catalog.modules]] entry with a fresh `last_verified = <sha>` per §15.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import socket
import subprocess
import sys
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
# Allow running as a script.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from exploits.modules import ModuleConfig, load_module_config # noqa: E402
from exploits.msfrpc import ( # noqa: E402
MSFRpcClient, MSFRpcConfig, wait_for_new_session,
)
from orchestrator.manifest import ( # noqa: E402
ManifestError, load_canonical,
)
from orchestrator.target_spec import ( # noqa: E402
TargetSpec, TargetSpecError, load_target_spec,
)
EXIT_SYSADMIN_ERROR = 78
DEFAULT_IMAGES_DIR = Path("/var/lib/cis490/vm/images")
@dataclass
class VerificationStep:
name: str
passed: bool
detail: str = ""
elapsed_s: float = 0.0
@dataclass
class VerificationResult:
module_name: str
target_name: str
overall_passed: bool
steps: list[VerificationStep] = field(default_factory=list)
error: str | None = None
def to_dict(self) -> dict:
return {
"module_name": self.module_name,
"target_name": self.target_name,
"overall_passed": self.overall_passed,
"steps": [asdict(s) for s in self.steps],
"error": self.error,
}
# ---------------------------------------------------------------------
# Containment-correct QEMU boot for the target. Mirrors verify.sh's
# posture (§4.13) but driven from Python so the verifier can pull the
# QMP socket for a clean teardown.
# ---------------------------------------------------------------------
@dataclass
class _TargetBoot:
pid: int
qmp_sock: Path
serial_sock: Path
host_port: int
image_path: Path
run_dir: Path
def terminate(self) -> None:
if self.pid <= 0:
return
try:
os.kill(self.pid, 15) # SIGTERM
for _ in range(50):
try:
os.kill(self.pid, 0)
except ProcessLookupError:
break
time.sleep(0.1)
else:
os.kill(self.pid, 9) # SIGKILL
except ProcessLookupError:
pass
def _boot_target(
image_path: Path,
spec: TargetSpec,
run_dir: Path,
log: logging.Logger,
) -> _TargetBoot:
run_dir.mkdir(parents=True, exist_ok=True)
host_port = 30000 + (os.getpid() % 5000)
pidfile = run_dir / "qemu.pid"
qmp_sock = run_dir / "qmp.sock"
serial_sock = run_dir / "serial.sock"
cmd = [
"qemu-system-x86_64",
"-name", f"cis490-verify-{spec.name}",
"-machine", "q35,accel=kvm",
"-cpu", "host",
"-smp", "1",
"-m", "512",
"-drive", f"file={image_path},format=qcow2,if=virtio,snapshot=on",
"-netdev", (f"user,id=n0,restrict=on,"
f"hostfwd={spec.promises.service_proto}:127.0.0.1:"
f"{host_port}-:{spec.promises.service_port}"),
"-device", "virtio-net-pci,netdev=n0",
"-nographic",
"-display", "none",
"-serial", f"unix:{serial_sock},server=on,wait=off",
"-qmp", f"unix:{qmp_sock},server=on,wait=off",
"-pidfile", str(pidfile),
"-daemonize",
]
log.info("boot: %s", " ".join(cmd))
rc = subprocess.run(cmd, check=False).returncode
if rc != 0:
raise RuntimeError(f"qemu-system-x86_64 returned {rc}; refusing to verify")
# The pidfile shows up on QEMU's daemonize.
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if pidfile.exists():
break
time.sleep(0.1)
if not pidfile.exists():
raise RuntimeError("qemu daemonized but no pidfile appeared")
pid = int(pidfile.read_text().strip())
return _TargetBoot(
pid=pid, qmp_sock=qmp_sock, serial_sock=serial_sock,
host_port=host_port, image_path=image_path, run_dir=run_dir,
)
def _wait_for_tcp(host: str, port: int, timeout_s: float) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=2.0) as s:
s.close()
return True
except (OSError, socket.timeout):
time.sleep(1.0)
return False
# ---------------------------------------------------------------------
# Verification flow
# ---------------------------------------------------------------------
def verify_module(
repo_root: Path,
module_name: str,
target_name: str,
images_dir: Path,
*,
msf_password: str,
msf_host: str = "127.0.0.1",
msf_port: int = 55553,
boot_timeout_s: float = 180.0,
session_timeout_s: float = 30.0,
log: logging.Logger | None = None,
# Hooks for tests — replace with mocks when real qemu / msfrpcd
# aren't available. None means use the real implementations.
boot_fn=None,
msf_client_factory=None,
image_path_resolver=None,
) -> VerificationResult:
"""Run the §4.3 verification flow against (module, target).
Returns a VerificationResult with per-step outcomes. The caller is
responsible for translating the result into an exit code."""
if log is None:
log = logging.getLogger("cis490.verify-catalog")
boot_fn = boot_fn or _boot_target
msf_client_factory = msf_client_factory or (
lambda cfg: MSFRpcClient(cfg)
)
result = VerificationResult(
module_name=module_name, target_name=target_name,
overall_passed=False,
)
# Step 1: load module + target spec
t0 = time.monotonic()
try:
module = load_module_config(
repo_root / "exploits" / "modules" / f"{module_name}.toml"
)
except (FileNotFoundError, ValueError) as e:
result.error = f"module config load failed: {e}"
return result
try:
target = load_target_spec(repo_root, target_name)
except TargetSpecError as e:
result.error = f"target spec load failed: {e}"
return result
result.steps.append(VerificationStep(
name="load_specs", passed=True,
detail=f"module={module.module_path} target={target.name}",
elapsed_s=time.monotonic() - t0,
))
# Step 2: resolve image path (with sha256 check left to a different
# check pass — for verification we just need the file to exist).
if image_path_resolver is not None:
image_path = image_path_resolver(target_name)
else:
image_path = images_dir / f"{target_name}.qcow2"
if not image_path.exists():
result.steps.append(VerificationStep(
name="image_present", passed=False,
detail=f"no image at {image_path}; build it first",
))
result.error = "image missing"
return result
result.steps.append(VerificationStep(
name="image_present", passed=True, detail=str(image_path),
))
# Step 3: boot target under §4.13 containment
t0 = time.monotonic()
run_dir = Path(f"/tmp/cis490-verify-{module_name}-{os.getpid()}")
try:
boot = boot_fn(image_path, target, run_dir, log)
except Exception as e:
result.steps.append(VerificationStep(
name="boot_target", passed=False, detail=str(e),
elapsed_s=time.monotonic() - t0,
))
result.error = f"boot failed: {e}"
return result
result.steps.append(VerificationStep(
name="boot_target", passed=True,
detail=f"pid={boot.pid} host_port={boot.host_port}",
elapsed_s=time.monotonic() - t0,
))
try:
# Step 4: wait for service
t0 = time.monotonic()
up = _wait_for_tcp("127.0.0.1", boot.host_port, boot_timeout_s)
result.steps.append(VerificationStep(
name="service_up", passed=up,
detail=f"port {boot.host_port} (= guest {target.promises.service_port})",
elapsed_s=time.monotonic() - t0,
))
if not up:
result.error = "service never came up within boot_timeout_s"
return result
# Step 5: msfrpcd login + module fire
t0 = time.monotonic()
client = msf_client_factory(MSFRpcConfig(
host=msf_host, port=msf_port, user="msf", password=msf_password,
))
try:
client.login()
seen = set(client.session_list().keys())
opts = module.render_options(target_ip="127.0.0.1")
opts["RPORT"] = boot.host_port
client.module_execute(module.module_type, module.module_path, opts)
result.steps.append(VerificationStep(
name="module_fire", passed=True,
detail=f"module={module.module_path}",
elapsed_s=time.monotonic() - t0,
))
# Step 6: wait for session_open
t0 = time.monotonic()
opened = wait_for_new_session(
client, seen=seen, timeout_s=session_timeout_s,
)
if opened is None:
result.steps.append(VerificationStep(
name="session_open", passed=False,
detail=f"timed out after {session_timeout_s}s",
elapsed_s=time.monotonic() - t0,
))
result.error = "session_open_timeout"
return result
sid, info = opened
result.steps.append(VerificationStep(
name="session_open", passed=True,
detail=f"sid={sid} type={info.get('type')}",
elapsed_s=time.monotonic() - t0,
))
# Step 7: shell round-trip
t0 = time.monotonic()
client.session_shell_write(sid, "id\n")
time.sleep(1.0)
shell_out = client.session_shell_read(sid)
rt_ok = "uid=" in shell_out
result.steps.append(VerificationStep(
name="shell_roundtrip", passed=rt_ok,
detail=shell_out.strip()[:160],
elapsed_s=time.monotonic() - t0,
))
if not rt_ok:
result.error = "shell round-trip didn't return id-shaped output"
return result
# Step 8: guest-side artifact
t0 = time.monotonic()
marker = f"/tmp/cis490_verify_marker_{os.getpid()}"
client.session_shell_write(sid, f"touch {marker}\n")
time.sleep(0.5)
client.session_shell_write(
sid, f"ls {marker} && echo VERIFY_OK\n"
)
time.sleep(0.5)
artifact_out = client.session_shell_read(sid)
artifact_ok = "VERIFY_OK" in artifact_out
result.steps.append(VerificationStep(
name="guest_artifact", passed=artifact_ok,
detail=artifact_out.strip()[:160],
elapsed_s=time.monotonic() - t0,
))
if not artifact_ok:
result.error = "guest-side artifact check failed"
return result
# Cleanup the session (optional — VM teardown does it too).
try:
client.session_stop(sid)
except Exception:
pass
finally:
try:
client.logout()
except Exception:
pass
finally:
boot.terminate()
result.overall_passed = all(s.passed for s in result.steps)
return result
# ---------------------------------------------------------------------
# CLI driver
# ---------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-verify-catalog")
p.add_argument("module_name", nargs="?",
help="Single module to verify; omit to verify all")
p.add_argument("--target",
help="Target spec name; defaults to verified_against "
"from the manifest catalog entry")
p.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
p.add_argument("--out", type=Path, default=None,
help="Write per-module JSON results to this path")
p.add_argument("--log-level", default="INFO")
args = p.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("cis490.verify-catalog")
repo_root = Path(__file__).resolve().parent.parent
try:
manifest = load_canonical(repo_root)
except ManifestError as e:
log.error("canonical manifest failed to load: %s", e)
return EXIT_SYSADMIN_ERROR
msf_password = os.environ.get("MSFRPC_PASSWORD")
if not msf_password:
log.error("MSFRPC_PASSWORD env var must be set")
return EXIT_SYSADMIN_ERROR
# Decide which (module, target) pairs to verify.
pairs: list[tuple[str, str]] = []
if args.module_name:
target = args.target
if target is None:
for entry in manifest.catalog:
if entry.name == args.module_name:
target = entry.verified_against
break
if target is None:
log.error(
"module %s not in manifest.catalog; pass --target "
"explicitly to verify out-of-catalog",
args.module_name,
)
return EXIT_SYSADMIN_ERROR
pairs.append((args.module_name, target))
else:
if not manifest.catalog:
log.warning("manifest.catalog is empty; nothing to verify (§4.3)")
return 0
pairs = [(e.name, e.verified_against) for e in manifest.catalog]
results: list[VerificationResult] = []
any_failed = False
for module_name, target_name in pairs:
log.info("---- verifying %s against %s ----", module_name, target_name)
result = verify_module(
repo_root=repo_root,
module_name=module_name,
target_name=target_name,
images_dir=args.images_dir,
msf_password=msf_password,
log=log,
)
results.append(result)
status = "PASS" if result.overall_passed else "FAIL"
log.info("---- %s: %s (%s)", module_name, status,
result.error or "all steps passed")
if not result.overall_passed:
any_failed = True
# Structured output for CI ingestion / operator review.
payload = {
"experiment": manifest.name,
"results": [r.to_dict() for r in results],
"overall_passed": not any_failed,
}
if args.out:
args.out.write_text(json.dumps(payload, indent=2))
log.info("wrote results to %s", args.out)
else:
print(json.dumps(payload, indent=2))
return 0 if not any_failed else 1
if __name__ == "__main__":
sys.exit(main())