CIS490/tests/test_verify_catalog.py
Max Gorog 22269e175d PIPELINE §5 step 4: catalog admission verifier (§4.3)
tools/verify_catalog.py runs the §4.3 end-to-end verification flow
against every entry in manifest.toml's [catalog].modules (or a single
named module). The flow follows §4.3 exactly:

  1. Load the module config + the verified-against target spec.
  2. Resolve the published image path; fail loudly if absent.
  3. Boot the target VM under §4.13 containment (restrict=on, snapshot=on,
     no shared FS, unprivileged QEMU — same posture as verify.sh).
  4. Wait for the service on the spec'd port.
  5. Login to msfrpcd, snapshot the existing session set, fire the
     module against `127.0.0.1:<host_port>` (the SLIRP hostfwd to the
     guest's promised service port).
  6. Wait for `session_open` — NOT session_open_timeout, which is the
     §4.5 failed-label outcome.
  7. Round-trip a shell command (`id`); confirm uid= shape.
  8. Confirm a guest-side artifact (touch marker; ls + echo VERIFY_OK).

Per-module exit code is 0 only when EVERY step passes. CLI exit is 0
only when EVERY requested module passes — partial credit isn't an
option (§1 default-to-removal: a module that can't pass shouldn't be
in the catalog).

Structured JSON output with per-step timings + detail strings, written
to stdout or --out <path>. Operator pulls this into a successful CI
run + signs off on the manifest.toml [[catalog.modules]] amendment
with a fresh `last_verified = <commit_sha>` per §15.

Tests (tests/test_verify_catalog.py, 8 cases): exercise the flow with
a mocked MSFRpcClient + mocked qemu boot. Cover happy path, every
short-circuit failure mode (image missing, service never up, session
timeout, shell round-trip wrong, guest artifact missing), and
spec-load errors. Real verification needs lab hardware; the mocked
flow proves the orchestration contract.

269 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:35:32 -05:00

337 lines
10 KiB
Python

"""Tests for tools/verify_catalog.py — the §4.3 catalog admission
verifier.
Real verifications boot a VM and talk to msfrpcd. These tests mock
both so the §4.3 flow can be exercised in CI without lab hardware.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
# Load the verifier as a module (it's a script). Register in
# sys.modules so dataclass decorators inside it can resolve back via
# cls.__module__.
spec = importlib.util.spec_from_file_location(
"verify_catalog", REPO_ROOT / "tools" / "verify_catalog.py",
)
verify_catalog = importlib.util.module_from_spec(spec)
sys.modules["verify_catalog"] = verify_catalog
spec.loader.exec_module(verify_catalog)
# ---------------------------------------------------------------------
# Test fixtures: a synthetic target spec on disk + a mock msfrpc client
# ---------------------------------------------------------------------
VALID_TARGET_SPEC = """
name = "fixture-target"
description = "fixture for verifier tests"
base_image = "alpine-3.21-virt"
[promises]
cve = "CVE-2014-6271"
service_name = "apache"
service_port = 80
service_proto = "tcp"
vulnerable_software = "bash"
vulnerable_version = "4.2"
[containment]
upstream_egress = false
shared_filesystem = false
unprivileged_qemu = true
fresh_snapshot_per_episode = true
"""
VALID_MODULE_TOML = """
description = "fixture module"
[module]
type = "exploit"
path = "multi/test/fixture"
[module.options]
RHOSTS = "{{ target_ip }}"
RPORT = 80
[payload]
path = "cmd/unix/bind_perl"
[payload.options]
LPORT = 4444
[session]
type = "shell"
[runtime]
requires_bridge = false
"""
def _stage_repo(tmp_path: Path) -> Path:
"""Build a minimal repo skeleton with the fixture target + module."""
target_dir = tmp_path / "vm" / "targets" / "fixture-target"
target_dir.mkdir(parents=True)
(target_dir / "spec.toml").write_text(VALID_TARGET_SPEC)
modules_dir = tmp_path / "exploits" / "modules"
modules_dir.mkdir(parents=True)
(modules_dir / "fixture-module.toml").write_text(VALID_MODULE_TOML)
images_dir = tmp_path / "images"
images_dir.mkdir()
(images_dir / "fixture-target.qcow2").write_bytes(b"fake qcow2 bytes")
return tmp_path
class MockMSFClient:
"""Stand-in for MSFRpcClient that lets each test script the
response shape — what session opens (or doesn't), what shell
commands return."""
def __init__(
self,
*,
sessions_at_arm: dict | None = None,
sessions_after_fire: dict | None = None,
shell_responses: dict[int, list[str]] | None = None,
) -> None:
self._before = sessions_at_arm or {}
self._after = sessions_after_fire or {}
self._fired = False
self._shell_buffers = {
sid: list(resps) for sid, resps in (shell_responses or {}).items()
}
def login(self) -> None:
pass
def logout(self) -> None:
pass
def session_list(self) -> dict:
return self._after if self._fired else self._before
def module_execute(self, module_type, module_path, options):
self._fired = True
return {"job_id": 1, "uuid": "fake"}
def session_shell_write(self, sid, data):
return {}
def session_shell_read(self, sid):
if sid in self._shell_buffers and self._shell_buffers[sid]:
return self._shell_buffers[sid].pop(0)
return ""
def session_stop(self, sid):
return {}
class MockBoot:
pid = 12345
def terminate(self):
pass
def _mock_boot_fn_factory(host_port: int = 12345):
boot = MockBoot()
boot.host_port = host_port
def _boot(image_path, spec, run_dir, log):
# Mark the run_dir so we can assert it was created.
run_dir.mkdir(parents=True, exist_ok=True)
return boot
return _boot, boot
# ---------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------
def test_happy_path_passes_every_step(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, boot = _mock_boot_fn_factory()
# Force the TCP probe to "succeed" by replacing _wait_for_tcp.
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
msf = MockMSFClient(
sessions_at_arm={},
sessions_after_fire={42: {"type": "shell"}},
shell_responses={
42: [
"uid=0(root) gid=0(root)\n", # `id`
"/tmp/cis490_verify_marker_xxx\nVERIFY_OK\n", # ls + echo
],
},
)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert result.overall_passed, result.to_dict()
step_names = [s.name for s in result.steps]
# All §4.3 steps present + passed.
assert "load_specs" in step_names
assert "image_present" in step_names
assert "boot_target" in step_names
assert "service_up" in step_names
assert "module_fire" in step_names
assert "session_open" in step_names
assert "shell_roundtrip" in step_names
assert "guest_artifact" in step_names
assert all(s.passed for s in result.steps)
def test_missing_module_fails_at_load(tmp_path):
repo = _stage_repo(tmp_path)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="no-such-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert "module config load failed" in result.error
def test_missing_target_spec_fails_at_load(tmp_path):
repo = _stage_repo(tmp_path)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="no-such-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert "target spec load failed" in result.error
def test_missing_image_fails_loudly(tmp_path):
repo = _stage_repo(tmp_path)
(repo / "images" / "fixture-target.qcow2").unlink() # drop the image
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=lambda *a, **kw: MockBoot(),
msf_client_factory=lambda cfg: MockMSFClient(),
)
assert not result.overall_passed
assert result.error == "image missing"
image_step = next(s for s in result.steps if s.name == "image_present")
assert not image_step.passed
def test_session_open_timeout_recorded_as_failure(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
# session.list never sees a new entry.
msf = MockMSFClient(sessions_at_arm={}, sessions_after_fire={})
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
session_timeout_s=0.5,
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert result.error == "session_open_timeout"
session_step = next(s for s in result.steps if s.name == "session_open")
assert not session_step.passed
def test_shell_roundtrip_failure_short_circuits_artifact_check(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: True)
msf = MockMSFClient(
sessions_at_arm={},
sessions_after_fire={1: {"type": "shell"}},
shell_responses={1: ["garbage no uid here\n"]},
)
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "shell round-trip" in result.error
# We never reached the guest_artifact step.
assert not any(s.name == "guest_artifact" for s in result.steps)
def test_service_never_comes_up_fails(tmp_path, monkeypatch):
repo = _stage_repo(tmp_path)
boot_fn, _ = _mock_boot_fn_factory()
monkeypatch.setattr(verify_catalog, "_wait_for_tcp",
lambda host, port, timeout_s: False)
msf = MockMSFClient()
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_timeout_s=0.5,
boot_fn=boot_fn,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "service never came up" in result.error
def test_boot_failure_propagates(tmp_path):
repo = _stage_repo(tmp_path)
def explode(*a, **kw):
raise RuntimeError("qemu died")
msf = MockMSFClient()
result = verify_catalog.verify_module(
repo_root=repo,
module_name="fixture-module",
target_name="fixture-target",
images_dir=repo / "images",
msf_password="x",
boot_fn=explode,
msf_client_factory=lambda cfg: msf,
)
assert not result.overall_passed
assert "qemu died" in result.error
boot_step = next(s for s in result.steps if s.name == "boot_target")
assert not boot_step.passed