"""Catalog admission verifier (PIPELINE.md §4.3). Re-runs the full end-to-end verification flow for every catalog entry in manifest.toml's `[catalog].modules`, or for a single module when named on the CLI: 1. Boot the verified-against target VM under §4.13 containment. 2. Wait for the target's promised service to come up. 3. Connect to msfrpcd and fire the module. 4. Observe `session_open` event within timeout (NOT `session_open_timeout` — that's §4.5's failed label). 5. Round-trip a shell command (`id`); confirm response shape. 6. Confirm a guest-side artifact (touch marker; ls). 7. Tear the target down (snapshot revert via QMP). Failures: * Module-config or target-spec load → exit 78 (sysadmin error) * Image missing or sha256 mismatch → exit 1, module FAILS verification * Service didn't come up → exit 1, module FAILS verification * No session_open within timeout → exit 1, module FAILS verification * Round-trip / artifact failure → exit 1, module FAILS verification Exit 0 ONLY when every requested module passes every step. Any module that fails should be REMOVED from the manifest catalog (§4.3 + §1 default-to-removal) — there's no "partial credit" admission. This script is the gate: a CI run that goes green produces an artifact the operator can then sign off on for amending manifest.toml's [[catalog.modules]] entry with a fresh `last_verified = ` per §15. """ from __future__ import annotations import argparse import json import logging import os import socket import subprocess import sys import time from dataclasses import asdict, dataclass, field from pathlib import Path # Allow running as a script. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from exploits.modules import ModuleConfig, load_module_config # noqa: E402 from exploits.msfrpc import ( # noqa: E402 MSFRpcClient, MSFRpcConfig, wait_for_new_session, ) from orchestrator.manifest import ( # noqa: E402 ManifestError, load_canonical, ) from orchestrator.target_spec import ( # noqa: E402 TargetSpec, TargetSpecError, load_target_spec, ) EXIT_SYSADMIN_ERROR = 78 DEFAULT_IMAGES_DIR = Path("/var/lib/cis490/vm/images") @dataclass class VerificationStep: name: str passed: bool detail: str = "" elapsed_s: float = 0.0 @dataclass class VerificationResult: module_name: str target_name: str overall_passed: bool steps: list[VerificationStep] = field(default_factory=list) error: str | None = None def to_dict(self) -> dict: return { "module_name": self.module_name, "target_name": self.target_name, "overall_passed": self.overall_passed, "steps": [asdict(s) for s in self.steps], "error": self.error, } # --------------------------------------------------------------------- # Containment-correct QEMU boot for the target. Mirrors verify.sh's # posture (§4.13) but driven from Python so the verifier can pull the # QMP socket for a clean teardown. # --------------------------------------------------------------------- @dataclass class _TargetBoot: pid: int qmp_sock: Path serial_sock: Path host_port: int image_path: Path run_dir: Path def terminate(self) -> None: if self.pid <= 0: return try: os.kill(self.pid, 15) # SIGTERM for _ in range(50): try: os.kill(self.pid, 0) except ProcessLookupError: break time.sleep(0.1) else: os.kill(self.pid, 9) # SIGKILL except ProcessLookupError: pass def _boot_target( image_path: Path, spec: TargetSpec, run_dir: Path, log: logging.Logger, ) -> _TargetBoot: run_dir.mkdir(parents=True, exist_ok=True) host_port = 30000 + (os.getpid() % 5000) pidfile = run_dir / "qemu.pid" qmp_sock = run_dir / "qmp.sock" serial_sock = run_dir / "serial.sock" cmd = [ "qemu-system-x86_64", "-name", f"cis490-verify-{spec.name}", "-machine", "q35,accel=kvm", "-cpu", "host", "-smp", "1", "-m", "512", "-drive", f"file={image_path},format=qcow2,if=virtio,snapshot=on", "-netdev", (f"user,id=n0,restrict=on," f"hostfwd={spec.promises.service_proto}:127.0.0.1:" f"{host_port}-:{spec.promises.service_port}"), "-device", "virtio-net-pci,netdev=n0", "-nographic", "-display", "none", "-serial", f"unix:{serial_sock},server=on,wait=off", "-qmp", f"unix:{qmp_sock},server=on,wait=off", "-pidfile", str(pidfile), "-daemonize", ] log.info("boot: %s", " ".join(cmd)) rc = subprocess.run(cmd, check=False).returncode if rc != 0: raise RuntimeError(f"qemu-system-x86_64 returned {rc}; refusing to verify") # The pidfile shows up on QEMU's daemonize. deadline = time.monotonic() + 5.0 while time.monotonic() < deadline: if pidfile.exists(): break time.sleep(0.1) if not pidfile.exists(): raise RuntimeError("qemu daemonized but no pidfile appeared") pid = int(pidfile.read_text().strip()) return _TargetBoot( pid=pid, qmp_sock=qmp_sock, serial_sock=serial_sock, host_port=host_port, image_path=image_path, run_dir=run_dir, ) def _wait_for_tcp(host: str, port: int, timeout_s: float) -> bool: deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: try: with socket.create_connection((host, port), timeout=2.0) as s: s.close() return True except (OSError, socket.timeout): time.sleep(1.0) return False # --------------------------------------------------------------------- # Verification flow # --------------------------------------------------------------------- def verify_module( repo_root: Path, module_name: str, target_name: str, images_dir: Path, *, msf_password: str, msf_host: str = "127.0.0.1", msf_port: int = 55553, boot_timeout_s: float = 180.0, session_timeout_s: float = 30.0, log: logging.Logger | None = None, # Hooks for tests — replace with mocks when real qemu / msfrpcd # aren't available. None means use the real implementations. boot_fn=None, msf_client_factory=None, image_path_resolver=None, ) -> VerificationResult: """Run the §4.3 verification flow against (module, target). Returns a VerificationResult with per-step outcomes. The caller is responsible for translating the result into an exit code.""" if log is None: log = logging.getLogger("cis490.verify-catalog") boot_fn = boot_fn or _boot_target msf_client_factory = msf_client_factory or ( lambda cfg: MSFRpcClient(cfg) ) result = VerificationResult( module_name=module_name, target_name=target_name, overall_passed=False, ) # Step 1: load module + target spec t0 = time.monotonic() try: module = load_module_config( repo_root / "exploits" / "modules" / f"{module_name}.toml" ) except (FileNotFoundError, ValueError) as e: result.error = f"module config load failed: {e}" return result try: target = load_target_spec(repo_root, target_name) except TargetSpecError as e: result.error = f"target spec load failed: {e}" return result result.steps.append(VerificationStep( name="load_specs", passed=True, detail=f"module={module.module_path} target={target.name}", elapsed_s=time.monotonic() - t0, )) # Step 2: resolve image path (with sha256 check left to a different # check pass — for verification we just need the file to exist). if image_path_resolver is not None: image_path = image_path_resolver(target_name) else: image_path = images_dir / f"{target_name}.qcow2" if not image_path.exists(): result.steps.append(VerificationStep( name="image_present", passed=False, detail=f"no image at {image_path}; build it first", )) result.error = "image missing" return result result.steps.append(VerificationStep( name="image_present", passed=True, detail=str(image_path), )) # Step 3: boot target under §4.13 containment t0 = time.monotonic() run_dir = Path(f"/tmp/cis490-verify-{module_name}-{os.getpid()}") try: boot = boot_fn(image_path, target, run_dir, log) except Exception as e: result.steps.append(VerificationStep( name="boot_target", passed=False, detail=str(e), elapsed_s=time.monotonic() - t0, )) result.error = f"boot failed: {e}" return result result.steps.append(VerificationStep( name="boot_target", passed=True, detail=f"pid={boot.pid} host_port={boot.host_port}", elapsed_s=time.monotonic() - t0, )) try: # Step 4: wait for service t0 = time.monotonic() up = _wait_for_tcp("127.0.0.1", boot.host_port, boot_timeout_s) result.steps.append(VerificationStep( name="service_up", passed=up, detail=f"port {boot.host_port} (= guest {target.promises.service_port})", elapsed_s=time.monotonic() - t0, )) if not up: result.error = "service never came up within boot_timeout_s" return result # Step 5: msfrpcd login + module fire t0 = time.monotonic() client = msf_client_factory(MSFRpcConfig( host=msf_host, port=msf_port, user="msf", password=msf_password, )) try: client.login() seen = set(client.session_list().keys()) opts = module.render_options(target_ip="127.0.0.1") opts["RPORT"] = boot.host_port client.module_execute(module.module_type, module.module_path, opts) result.steps.append(VerificationStep( name="module_fire", passed=True, detail=f"module={module.module_path}", elapsed_s=time.monotonic() - t0, )) # Step 6: wait for session_open t0 = time.monotonic() opened = wait_for_new_session( client, seen=seen, timeout_s=session_timeout_s, ) if opened is None: result.steps.append(VerificationStep( name="session_open", passed=False, detail=f"timed out after {session_timeout_s}s", elapsed_s=time.monotonic() - t0, )) result.error = "session_open_timeout" return result sid, info = opened result.steps.append(VerificationStep( name="session_open", passed=True, detail=f"sid={sid} type={info.get('type')}", elapsed_s=time.monotonic() - t0, )) # Step 7: shell round-trip t0 = time.monotonic() client.session_shell_write(sid, "id\n") time.sleep(1.0) shell_out = client.session_shell_read(sid) rt_ok = "uid=" in shell_out result.steps.append(VerificationStep( name="shell_roundtrip", passed=rt_ok, detail=shell_out.strip()[:160], elapsed_s=time.monotonic() - t0, )) if not rt_ok: result.error = "shell round-trip didn't return id-shaped output" return result # Step 8: guest-side artifact t0 = time.monotonic() marker = f"/tmp/cis490_verify_marker_{os.getpid()}" client.session_shell_write(sid, f"touch {marker}\n") time.sleep(0.5) client.session_shell_write( sid, f"ls {marker} && echo VERIFY_OK\n" ) time.sleep(0.5) artifact_out = client.session_shell_read(sid) artifact_ok = "VERIFY_OK" in artifact_out result.steps.append(VerificationStep( name="guest_artifact", passed=artifact_ok, detail=artifact_out.strip()[:160], elapsed_s=time.monotonic() - t0, )) if not artifact_ok: result.error = "guest-side artifact check failed" return result # Cleanup the session (optional — VM teardown does it too). try: client.session_stop(sid) except Exception: pass finally: try: client.logout() except Exception: pass finally: boot.terminate() result.overall_passed = all(s.passed for s in result.steps) return result # --------------------------------------------------------------------- # CLI driver # --------------------------------------------------------------------- def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-verify-catalog") p.add_argument("module_name", nargs="?", help="Single module to verify; omit to verify all") p.add_argument("--target", help="Target spec name; defaults to verified_against " "from the manifest catalog entry") p.add_argument("--images-dir", type=Path, default=DEFAULT_IMAGES_DIR) p.add_argument("--out", type=Path, default=None, help="Write per-module JSON results to this path") p.add_argument("--log-level", default="INFO") args = p.parse_args(argv) logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger("cis490.verify-catalog") repo_root = Path(__file__).resolve().parent.parent try: manifest = load_canonical(repo_root) except ManifestError as e: log.error("canonical manifest failed to load: %s", e) return EXIT_SYSADMIN_ERROR msf_password = os.environ.get("MSFRPC_PASSWORD") if not msf_password: log.error("MSFRPC_PASSWORD env var must be set") return EXIT_SYSADMIN_ERROR # Decide which (module, target) pairs to verify. pairs: list[tuple[str, str]] = [] if args.module_name: target = args.target if target is None: for entry in manifest.catalog: if entry.name == args.module_name: target = entry.verified_against break if target is None: log.error( "module %s not in manifest.catalog; pass --target " "explicitly to verify out-of-catalog", args.module_name, ) return EXIT_SYSADMIN_ERROR pairs.append((args.module_name, target)) else: if not manifest.catalog: log.warning("manifest.catalog is empty; nothing to verify (§4.3)") return 0 pairs = [(e.name, e.verified_against) for e in manifest.catalog] results: list[VerificationResult] = [] any_failed = False for module_name, target_name in pairs: log.info("---- verifying %s against %s ----", module_name, target_name) result = verify_module( repo_root=repo_root, module_name=module_name, target_name=target_name, images_dir=args.images_dir, msf_password=msf_password, log=log, ) results.append(result) status = "PASS" if result.overall_passed else "FAIL" log.info("---- %s: %s (%s)", module_name, status, result.error or "all steps passed") if not result.overall_passed: any_failed = True # Structured output for CI ingestion / operator review. payload = { "experiment": manifest.name, "results": [r.to_dict() for r in results], "overall_passed": not any_failed, } if args.out: args.out.write_text(json.dumps(payload, indent=2)) log.info("wrote results to %s", args.out) else: print(json.dumps(payload, indent=2)) return 0 if not any_failed else 1 if __name__ == "__main__": sys.exit(main())