CIS490/tools/verify_catalog.py
Max Gorog 22269e175d PIPELINE §5 step 4: catalog admission verifier (§4.3)
tools/verify_catalog.py runs the §4.3 end-to-end verification flow
against every entry in manifest.toml's [catalog].modules (or a single
named module). The flow follows §4.3 exactly:

  1. Load the module config + the verified-against target spec.
  2. Resolve the published image path; fail loudly if absent.
  3. Boot the target VM under §4.13 containment (restrict=on, snapshot=on,
     no shared FS, unprivileged QEMU — same posture as verify.sh).
  4. Wait for the service on the spec'd port.
  5. Login to msfrpcd, snapshot the existing session set, fire the
     module against `127.0.0.1:<host_port>` (the SLIRP hostfwd to the
     guest's promised service port).
  6. Wait for `session_open` — NOT session_open_timeout, which is the
     §4.5 failed-label outcome.
  7. Round-trip a shell command (`id`); confirm uid= shape.
  8. Confirm a guest-side artifact (touch marker; ls + echo VERIFY_OK).

Per-module exit code is 0 only when EVERY step passes. CLI exit is 0
only when EVERY requested module passes — partial credit isn't an
option (§1 default-to-removal: a module that can't pass shouldn't be
in the catalog).

Structured JSON output with per-step timings + detail strings, written
to stdout or --out <path>. Operator pulls this into a successful CI
run + signs off on the manifest.toml [[catalog.modules]] amendment
with a fresh `last_verified = <commit_sha>` per §15.

Tests (tests/test_verify_catalog.py, 8 cases): exercise the flow with
a mocked MSFRpcClient + mocked qemu boot. Cover happy path, every
short-circuit failure mode (image missing, service never up, session
timeout, shell round-trip wrong, guest artifact missing), and
spec-load errors. Real verification needs lab hardware; the mocked
flow proves the orchestration contract.

269 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:35:32 -05:00

476 lines
16 KiB
Python
Executable file

"""Catalog admission verifier (PIPELINE.md §4.3).
Re-runs the full end-to-end verification flow for every catalog entry
in manifest.toml's `[catalog].modules`, or for a single module when
named on the CLI:
1. Boot the verified-against target VM under §4.13 containment.
2. Wait for the target's promised service to come up.
3. Connect to msfrpcd and fire the module.
4. Observe `session_open` event within timeout (NOT
`session_open_timeout` — that's §4.5's failed label).
5. Round-trip a shell command (`id`); confirm response shape.
6. Confirm a guest-side artifact (touch marker; ls).
7. Tear the target down (snapshot revert via QMP).
Failures:
* Module-config or target-spec load → exit 78 (sysadmin error)
* Image missing or sha256 mismatch → exit 1, module FAILS verification
* Service didn't come up → exit 1, module FAILS verification
* No session_open within timeout → exit 1, module FAILS verification
* Round-trip / artifact failure → exit 1, module FAILS verification
Exit 0 ONLY when every requested module passes every step. Any module
that fails should be REMOVED from the manifest catalog (§4.3 +
§1 default-to-removal) — there's no "partial credit" admission.
This script is the gate: a CI run that goes green produces an artifact
the operator can then sign off on for amending manifest.toml's
[[catalog.modules]] entry with a fresh `last_verified = <sha>` per §15.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import socket
import subprocess
import sys
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
# Allow running as a script.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from exploits.modules import ModuleConfig, load_module_config # noqa: E402
from exploits.msfrpc import ( # noqa: E402
MSFRpcClient, MSFRpcConfig, wait_for_new_session,
)
from orchestrator.manifest import ( # noqa: E402
ManifestError, load_canonical,
)
from orchestrator.target_spec import ( # noqa: E402
TargetSpec, TargetSpecError, load_target_spec,
)
EXIT_SYSADMIN_ERROR = 78
DEFAULT_IMAGES_DIR = Path("/var/lib/cis490/vm/images")
@dataclass
class VerificationStep:
name: str
passed: bool
detail: str = ""
elapsed_s: float = 0.0
@dataclass
class VerificationResult:
module_name: str
target_name: str
overall_passed: bool
steps: list[VerificationStep] = field(default_factory=list)
error: str | None = None
def to_dict(self) -> dict:
return {
"module_name": self.module_name,
"target_name": self.target_name,
"overall_passed": self.overall_passed,
"steps": [asdict(s) for s in self.steps],
"error": self.error,
}
# ---------------------------------------------------------------------
# Containment-correct QEMU boot for the target. Mirrors verify.sh's
# posture (§4.13) but driven from Python so the verifier can pull the
# QMP socket for a clean teardown.
# ---------------------------------------------------------------------
@dataclass
class _TargetBoot:
pid: int
qmp_sock: Path
serial_sock: Path
host_port: int
image_path: Path
run_dir: Path
def terminate(self) -> None:
if self.pid <= 0:
return
try:
os.kill(self.pid, 15) # SIGTERM
for _ in range(50):
try:
os.kill(self.pid, 0)
except ProcessLookupError:
break
time.sleep(0.1)
else:
os.kill(self.pid, 9) # SIGKILL
except ProcessLookupError:
pass
def _boot_target(
image_path: Path,
spec: TargetSpec,
run_dir: Path,
log: logging.Logger,
) -> _TargetBoot:
run_dir.mkdir(parents=True, exist_ok=True)
host_port = 30000 + (os.getpid() % 5000)
pidfile = run_dir / "qemu.pid"
qmp_sock = run_dir / "qmp.sock"
serial_sock = run_dir / "serial.sock"
cmd = [
"qemu-system-x86_64",
"-name", f"cis490-verify-{spec.name}",
"-machine", "q35,accel=kvm",
"-cpu", "host",
"-smp", "1",
"-m", "512",
"-drive", f"file={image_path},format=qcow2,if=virtio,snapshot=on",
"-netdev", (f"user,id=n0,restrict=on,"
f"hostfwd={spec.promises.service_proto}:127.0.0.1:"
f"{host_port}-:{spec.promises.service_port}"),
"-device", "virtio-net-pci,netdev=n0",
"-nographic",
"-display", "none",
"-serial", f"unix:{serial_sock},server=on,wait=off",
"-qmp", f"unix:{qmp_sock},server=on,wait=off",
"-pidfile", str(pidfile),
"-daemonize",
]
log.info("boot: %s", " ".join(cmd))
rc = subprocess.run(cmd, check=False).returncode
if rc != 0:
raise RuntimeError(f"qemu-system-x86_64 returned {rc}; refusing to verify")
# The pidfile shows up on QEMU's daemonize.
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if pidfile.exists():
break
time.sleep(0.1)
if not pidfile.exists():
raise RuntimeError("qemu daemonized but no pidfile appeared")
pid = int(pidfile.read_text().strip())
return _TargetBoot(
pid=pid, qmp_sock=qmp_sock, serial_sock=serial_sock,
host_port=host_port, image_path=image_path, run_dir=run_dir,
)
def _wait_for_tcp(host: str, port: int, timeout_s: float) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=2.0) as s:
s.close()
return True
except (OSError, socket.timeout):
time.sleep(1.0)
return False
# ---------------------------------------------------------------------
# Verification flow
# ---------------------------------------------------------------------
def verify_module(
repo_root: Path,
module_name: str,
target_name: str,
images_dir: Path,
*,
msf_password: str,
msf_host: str = "127.0.0.1",
msf_port: int = 55553,
boot_timeout_s: float = 180.0,
session_timeout_s: float = 30.0,
log: logging.Logger | None = None,
# Hooks for tests — replace with mocks when real qemu / msfrpcd
# aren't available. None means use the real implementations.
boot_fn=None,
msf_client_factory=None,
image_path_resolver=None,
) -> VerificationResult:
"""Run the §4.3 verification flow against (module, target).
Returns a VerificationResult with per-step outcomes. The caller is
responsible for translating the result into an exit code."""
if log is None:
log = logging.getLogger("cis490.verify-catalog")
boot_fn = boot_fn or _boot_target
msf_client_factory = msf_client_factory or (
lambda cfg: MSFRpcClient(cfg)
)
result = VerificationResult(
module_name=module_name, target_name=target_name,
overall_passed=False,
)
# Step 1: load module + target spec
t0 = time.monotonic()
try:
module = load_module_config(
repo_root / "exploits" / "modules" / f"{module_name}.toml"
)
except (FileNotFoundError, ValueError) as e:
result.error = f"module config load failed: {e}"
return result
try:
target = load_target_spec(repo_root, target_name)
except TargetSpecError as e:
result.error = f"target spec load failed: {e}"
return result
result.steps.append(VerificationStep(
name="load_specs", passed=True,
detail=f"module={module.module_path} target={target.name}",
elapsed_s=time.monotonic() - t0,
))
# Step 2: resolve image path (with sha256 check left to a different
# check pass — for verification we just need the file to exist).
if image_path_resolver is not None:
image_path = image_path_resolver(target_name)
else:
image_path = images_dir / f"{target_name}.qcow2"
if not image_path.exists():
result.steps.append(VerificationStep(
name="image_present", passed=False,
detail=f"no image at {image_path}; build it first",
))
result.error = "image missing"
return result
result.steps.append(VerificationStep(
name="image_present", passed=True, detail=str(image_path),
))
# Step 3: boot target under §4.13 containment
t0 = time.monotonic()
run_dir = Path(f"/tmp/cis490-verify-{module_name}-{os.getpid()}")
try:
boot = boot_fn(image_path, target, run_dir, log)
except Exception as e:
result.steps.append(VerificationStep(
name="boot_target", passed=False, detail=str(e),
elapsed_s=time.monotonic() - t0,
))
result.error = f"boot failed: {e}"
return result
result.steps.append(VerificationStep(
name="boot_target", passed=True,
detail=f"pid={boot.pid} host_port={boot.host_port}",
elapsed_s=time.monotonic() - t0,
))
try:
# Step 4: wait for service
t0 = time.monotonic()
up = _wait_for_tcp("127.0.0.1", boot.host_port, boot_timeout_s)
result.steps.append(VerificationStep(
name="service_up", passed=up,
detail=f"port {boot.host_port} (= guest {target.promises.service_port})",
elapsed_s=time.monotonic() - t0,
))
if not up:
result.error = "service never came up within boot_timeout_s"
return result
# Step 5: msfrpcd login + module fire
t0 = time.monotonic()
client = msf_client_factory(MSFRpcConfig(
host=msf_host, port=msf_port, user="msf", password=msf_password,
))
try:
client.login()
seen = set(client.session_list().keys())
opts = module.render_options(target_ip="127.0.0.1")
opts["RPORT"] = boot.host_port
client.module_execute(module.module_type, module.module_path, opts)
result.steps.append(VerificationStep(
name="module_fire", passed=True,
detail=f"module={module.module_path}",
elapsed_s=time.monotonic() - t0,
))
# Step 6: wait for session_open
t0 = time.monotonic()
opened = wait_for_new_session(
client, seen=seen, timeout_s=session_timeout_s,
)
if opened is None:
result.steps.append(VerificationStep(
name="session_open", passed=False,
detail=f"timed out after {session_timeout_s}s",
elapsed_s=time.monotonic() - t0,
))
result.error = "session_open_timeout"
return result
sid, info = opened
result.steps.append(VerificationStep(
name="session_open", passed=True,
detail=f"sid={sid} type={info.get('type')}",
elapsed_s=time.monotonic() - t0,
))
# Step 7: shell round-trip
t0 = time.monotonic()
client.session_shell_write(sid, "id\n")
time.sleep(1.0)
shell_out = client.session_shell_read(sid)
rt_ok = "uid=" in shell_out
result.steps.append(VerificationStep(
name="shell_roundtrip", passed=rt_ok,
detail=shell_out.strip()[:160],
elapsed_s=time.monotonic() - t0,
))
if not rt_ok:
result.error = "shell round-trip didn't return id-shaped output"
return result
# Step 8: guest-side artifact
t0 = time.monotonic()
marker = f"/tmp/cis490_verify_marker_{os.getpid()}"
client.session_shell_write(sid, f"touch {marker}\n")
time.sleep(0.5)
client.session_shell_write(
sid, f"ls {marker} && echo VERIFY_OK\n"
)
time.sleep(0.5)
artifact_out = client.session_shell_read(sid)
artifact_ok = "VERIFY_OK" in artifact_out
result.steps.append(VerificationStep(
name="guest_artifact", passed=artifact_ok,
detail=artifact_out.strip()[:160],
elapsed_s=time.monotonic() - t0,
))
if not artifact_ok:
result.error = "guest-side artifact check failed"
return result
# Cleanup the session (optional — VM teardown does it too).
try:
client.session_stop(sid)
except Exception:
pass
finally:
try:
client.logout()
except Exception:
pass
finally:
boot.terminate()
result.overall_passed = all(s.passed for s in result.steps)
return result
# ---------------------------------------------------------------------
# CLI driver
# ---------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-verify-catalog")
p.add_argument("module_name", nargs="?",
help="Single module to verify; omit to verify all")
p.add_argument("--target",
help="Target spec name; defaults to verified_against "
"from the manifest catalog entry")
p.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR)
p.add_argument("--out", type=Path, default=None,
help="Write per-module JSON results to this path")
p.add_argument("--log-level", default="INFO")
args = p.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("cis490.verify-catalog")
repo_root = Path(__file__).resolve().parent.parent
try:
manifest = load_canonical(repo_root)
except ManifestError as e:
log.error("canonical manifest failed to load: %s", e)
return EXIT_SYSADMIN_ERROR
msf_password = os.environ.get("MSFRPC_PASSWORD")
if not msf_password:
log.error("MSFRPC_PASSWORD env var must be set")
return EXIT_SYSADMIN_ERROR
# Decide which (module, target) pairs to verify.
pairs: list[tuple[str, str]] = []
if args.module_name:
target = args.target
if target is None:
for entry in manifest.catalog:
if entry.name == args.module_name:
target = entry.verified_against
break
if target is None:
log.error(
"module %s not in manifest.catalog; pass --target "
"explicitly to verify out-of-catalog",
args.module_name,
)
return EXIT_SYSADMIN_ERROR
pairs.append((args.module_name, target))
else:
if not manifest.catalog:
log.warning("manifest.catalog is empty; nothing to verify (§4.3)")
return 0
pairs = [(e.name, e.verified_against) for e in manifest.catalog]
results: list[VerificationResult] = []
any_failed = False
for module_name, target_name in pairs:
log.info("---- verifying %s against %s ----", module_name, target_name)
result = verify_module(
repo_root=repo_root,
module_name=module_name,
target_name=target_name,
images_dir=args.images_dir,
msf_password=msf_password,
log=log,
)
results.append(result)
status = "PASS" if result.overall_passed else "FAIL"
log.info("---- %s: %s (%s)", module_name, status,
result.error or "all steps passed")
if not result.overall_passed:
any_failed = True
# Structured output for CI ingestion / operator review.
payload = {
"experiment": manifest.name,
"results": [r.to_dict() for r in results],
"overall_passed": not any_failed,
}
if args.out:
args.out.write_text(json.dumps(payload, indent=2))
log.info("wrote results to %s", args.out)
else:
print(json.dumps(payload, indent=2))
return 0 if not any_failed else 1
if __name__ == "__main__":
sys.exit(main())