"""Tests for tools/verify_catalog.py — the §4.3 catalog admission verifier. Real verifications boot a VM and talk to msfrpcd. These tests mock both so the §4.3 flow can be exercised in CI without lab hardware. """ from __future__ import annotations import importlib.util import sys from pathlib import Path import pytest REPO_ROOT = Path(__file__).resolve().parent.parent # Load the verifier as a module (it's a script). Register in # sys.modules so dataclass decorators inside it can resolve back via # cls.__module__. spec = importlib.util.spec_from_file_location( "verify_catalog", REPO_ROOT / "tools" / "verify_catalog.py", ) verify_catalog = importlib.util.module_from_spec(spec) sys.modules["verify_catalog"] = verify_catalog spec.loader.exec_module(verify_catalog) # --------------------------------------------------------------------- # Test fixtures: a synthetic target spec on disk + a mock msfrpc client # --------------------------------------------------------------------- VALID_TARGET_SPEC = """ name = "fixture-target" description = "fixture for verifier tests" base_image = "alpine-3.21-virt" [promises] cve = "CVE-2014-6271" service_name = "apache" service_port = 80 service_proto = "tcp" vulnerable_software = "bash" vulnerable_version = "4.2" [containment] upstream_egress = false shared_filesystem = false unprivileged_qemu = true fresh_snapshot_per_episode = true """ VALID_MODULE_TOML = """ description = "fixture module" [module] type = "exploit" path = "multi/test/fixture" [module.options] RHOSTS = "{{ target_ip }}" RPORT = 80 [payload] path = "cmd/unix/bind_perl" [payload.options] LPORT = 4444 [session] type = "shell" [runtime] requires_bridge = false """ def _stage_repo(tmp_path: Path) -> Path: """Build a minimal repo skeleton with the fixture target + module.""" target_dir = tmp_path / "vm" / "targets" / "fixture-target" target_dir.mkdir(parents=True) (target_dir / "spec.toml").write_text(VALID_TARGET_SPEC) modules_dir = tmp_path / "exploits" / "modules" modules_dir.mkdir(parents=True) (modules_dir / "fixture-module.toml").write_text(VALID_MODULE_TOML) images_dir = tmp_path / "images" images_dir.mkdir() (images_dir / "fixture-target.qcow2").write_bytes(b"fake qcow2 bytes") return tmp_path class MockMSFClient: """Stand-in for MSFRpcClient that lets each test script the response shape — what session opens (or doesn't), what shell commands return.""" def __init__( self, *, sessions_at_arm: dict | None = None, sessions_after_fire: dict | None = None, shell_responses: dict[int, list[str]] | None = None, ) -> None: self._before = sessions_at_arm or {} self._after = sessions_after_fire or {} self._fired = False self._shell_buffers = { sid: list(resps) for sid, resps in (shell_responses or {}).items() } def login(self) -> None: pass def logout(self) -> None: pass def session_list(self) -> dict: return self._after if self._fired else self._before def module_execute(self, module_type, module_path, options): self._fired = True return {"job_id": 1, "uuid": "fake"} def session_shell_write(self, sid, data): return {} def session_shell_read(self, sid): if sid in self._shell_buffers and self._shell_buffers[sid]: return self._shell_buffers[sid].pop(0) return "" def session_stop(self, sid): return {} class MockBoot: pid = 12345 def terminate(self): pass def _mock_boot_fn_factory(host_port: int = 12345): boot = MockBoot() boot.host_port = host_port def _boot(image_path, spec, run_dir, log): # Mark the run_dir so we can assert it was created. run_dir.mkdir(parents=True, exist_ok=True) return boot return _boot, boot # --------------------------------------------------------------------- # Tests # --------------------------------------------------------------------- def test_happy_path_passes_every_step(tmp_path, monkeypatch): repo = _stage_repo(tmp_path) boot_fn, boot = _mock_boot_fn_factory() # Force the TCP probe to "succeed" by replacing _wait_for_tcp. monkeypatch.setattr(verify_catalog, "_wait_for_tcp", lambda host, port, timeout_s: True) msf = MockMSFClient( sessions_at_arm={}, sessions_after_fire={42: {"type": "shell"}}, shell_responses={ 42: [ "uid=0(root) gid=0(root)\n", # `id` "/tmp/cis490_verify_marker_xxx\nVERIFY_OK\n", # ls + echo ], }, ) result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_fn=boot_fn, msf_client_factory=lambda cfg: msf, ) assert result.overall_passed, result.to_dict() step_names = [s.name for s in result.steps] # All §4.3 steps present + passed. assert "load_specs" in step_names assert "image_present" in step_names assert "boot_target" in step_names assert "service_up" in step_names assert "module_fire" in step_names assert "session_open" in step_names assert "shell_roundtrip" in step_names assert "guest_artifact" in step_names assert all(s.passed for s in result.steps) def test_missing_module_fails_at_load(tmp_path): repo = _stage_repo(tmp_path) result = verify_catalog.verify_module( repo_root=repo, module_name="no-such-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_fn=lambda *a, **kw: MockBoot(), msf_client_factory=lambda cfg: MockMSFClient(), ) assert not result.overall_passed assert "module config load failed" in result.error def test_missing_target_spec_fails_at_load(tmp_path): repo = _stage_repo(tmp_path) result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="no-such-target", images_dir=repo / "images", msf_password="x", boot_fn=lambda *a, **kw: MockBoot(), msf_client_factory=lambda cfg: MockMSFClient(), ) assert not result.overall_passed assert "target spec load failed" in result.error def test_missing_image_fails_loudly(tmp_path): repo = _stage_repo(tmp_path) (repo / "images" / "fixture-target.qcow2").unlink() # drop the image result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_fn=lambda *a, **kw: MockBoot(), msf_client_factory=lambda cfg: MockMSFClient(), ) assert not result.overall_passed assert result.error == "image missing" image_step = next(s for s in result.steps if s.name == "image_present") assert not image_step.passed def test_session_open_timeout_recorded_as_failure(tmp_path, monkeypatch): repo = _stage_repo(tmp_path) boot_fn, _ = _mock_boot_fn_factory() monkeypatch.setattr(verify_catalog, "_wait_for_tcp", lambda host, port, timeout_s: True) # session.list never sees a new entry. msf = MockMSFClient(sessions_at_arm={}, sessions_after_fire={}) result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", session_timeout_s=0.5, boot_fn=boot_fn, msf_client_factory=lambda cfg: msf, ) assert not result.overall_passed assert result.error == "session_open_timeout" session_step = next(s for s in result.steps if s.name == "session_open") assert not session_step.passed def test_shell_roundtrip_failure_short_circuits_artifact_check(tmp_path, monkeypatch): repo = _stage_repo(tmp_path) boot_fn, _ = _mock_boot_fn_factory() monkeypatch.setattr(verify_catalog, "_wait_for_tcp", lambda host, port, timeout_s: True) msf = MockMSFClient( sessions_at_arm={}, sessions_after_fire={1: {"type": "shell"}}, shell_responses={1: ["garbage no uid here\n"]}, ) result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_fn=boot_fn, msf_client_factory=lambda cfg: msf, ) assert not result.overall_passed assert "shell round-trip" in result.error # We never reached the guest_artifact step. assert not any(s.name == "guest_artifact" for s in result.steps) def test_service_never_comes_up_fails(tmp_path, monkeypatch): repo = _stage_repo(tmp_path) boot_fn, _ = _mock_boot_fn_factory() monkeypatch.setattr(verify_catalog, "_wait_for_tcp", lambda host, port, timeout_s: False) msf = MockMSFClient() result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_timeout_s=0.5, boot_fn=boot_fn, msf_client_factory=lambda cfg: msf, ) assert not result.overall_passed assert "service never came up" in result.error def test_boot_failure_propagates(tmp_path): repo = _stage_repo(tmp_path) def explode(*a, **kw): raise RuntimeError("qemu died") msf = MockMSFClient() result = verify_catalog.verify_module( repo_root=repo, module_name="fixture-module", target_name="fixture-target", images_dir=repo / "images", msf_password="x", boot_fn=explode, msf_client_factory=lambda cfg: msf, ) assert not result.overall_passed assert "qemu died" in result.error boot_step = next(s for s in result.steps if s.name == "boot_target") assert not boot_step.passed