"""Tests for the Tier-4 path: - real_binary_workload constructs valid shell commands - Sample.binary_path resolves correctly - MSFExploitDriver.real-sample dispatch picks the upload+exec path when a binary is staged, mimic when it isn't - tools/fetch_sample input validation (we don't hit the live API) """ from __future__ import annotations import hashlib from pathlib import Path import pytest from exploits.driver import DriverConfig, MSFExploitDriver from exploits.modules import load_module_config from exploits.workloads import ( chunked_real_binary_upload, real_binary_workload, ) from samples.manifest import Sample REPO_ROOT = Path(__file__).resolve().parent.parent MODULES_DIR = REPO_ROOT / "exploits" / "modules" # Reuse the FakeMSFRpcClient from test_exploits.py. from tests.test_exploits import FakeMSFRpcClient # noqa: E402 # --------------------------------------------------------------------------- # real_binary_workload # --------------------------------------------------------------------------- def test_real_binary_workload_embeds_base64() -> None: payload = b"\x7fELF" + b"\x00" * 64 # tiny ELF-shaped header w = real_binary_workload(payload) # Start command bundles a chunked upload (printf '%s' '' >> file). # Pull all b64 segments out and confirm they round-trip. import base64 as _b64 import re matches = re.findall(r"printf '%s' '([A-Za-z0-9+/=]+)'", w.start_cmd) assert matches, "expected printf-based b64 chunks in start_cmd" decoded = _b64.b64decode("".join(matches)) assert decoded == payload def test_chunked_real_binary_upload_splits_correctly() -> None: """A binary larger than the chunk size should produce >1 chunks plus a finalize + exec. Each chunk's payload must be individually valid base64 and the concatenation must round-trip.""" import base64 as _b64 import hashlib as _hashlib import re # Build a payload large enough to force multiple chunks. payload = (b"\x90\xab" * 8000) plan = chunked_real_binary_upload(payload) assert plan.n_chunks >= 3 # 1 init + 2+ data chunks assert plan.expected_sha256 == _hashlib.sha256(payload).hexdigest() # Reconstruct from chunks. segs = [] for c in plan.chunks: m = re.search(r"printf '%s' '([A-Za-z0-9+/=]+)'", c) if m: segs.append(m.group(1)) assert segs, "no data chunks parsed" decoded = _b64.b64decode("".join(segs)) assert decoded == payload # finalize_cmd verifies the sha256 we computed. assert plan.expected_sha256 in plan.finalize_cmd assert "sha256sum" in plan.finalize_cmd def test_real_binary_workload_stop_kills_pidfile() -> None: w = real_binary_workload(b"x" * 16) assert "kill" in w.stop_cmd assert ".cis490-real" in w.stop_cmd def test_real_binary_workload_per_profile_isolation() -> None: a = real_binary_workload(b"\x00", sample=Sample(name="a", family="A", category="rat", profile="cpu-saturate")) b = real_binary_workload(b"\x00", sample=Sample(name="b", family="B", category="rat", profile="bursty-c2")) # Different profiles → different /tmp paths so concurrent samples # don't stomp each other in the same guest. assert a.profile != b.profile assert a.start_cmd != b.start_cmd # --------------------------------------------------------------------------- # Sample.binary_path # --------------------------------------------------------------------------- def test_binary_path_resolves_when_staged(tmp_path: Path) -> None: sha = "a" * 64 (tmp_path / sha).write_bytes(b"hello") s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256=sha) assert s.binary_path(tmp_path) == tmp_path / sha def test_binary_path_none_when_missing(tmp_path: Path) -> None: s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256="b" * 64) assert s.binary_path(tmp_path) is None def test_binary_path_none_for_mimic_sample(tmp_path: Path) -> None: s = Sample(name="x", family="X", category="rat", profile="cpu-saturate") assert s.binary_path(tmp_path) is None # --------------------------------------------------------------------------- # Driver dispatch # --------------------------------------------------------------------------- def test_driver_picks_real_binary_when_staged(tmp_path: Path) -> None: payload = b"\x7fELF\x02" + b"\x00" * 60 sha = hashlib.sha256(payload).hexdigest() (tmp_path / sha).write_bytes(payload) sample = Sample( name="real-x", family="X", category="rat", profile="cpu-saturate", sha256=sha, ) cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}}) driver = MSFExploitDriver( client=client, # type: ignore[arg-type] module=cfg, cfg=DriverConfig( target_ip="10.200.0.10", session_open_timeout_s=0.5, sample_store_root=tmp_path, ), emit_event=lambda *a, **kw: None, sample=sample, ) # Driver picks the chunked-upload path. assert driver.workload is not None assert driver.workload.profile.startswith("real:") assert driver._chunked is not None assert driver._chunked.expected_sha256 == sha def test_driver_walks_chunked_upload_in_session(tmp_path: Path) -> None: """End-to-end: at infected_running, the driver should issue every chunk + finalize + exec as separate shell_write calls. The fake client records them in order so we can verify.""" payload = b"\xde\xad\xbe\xef" * 4096 # 16 KiB → multiple chunks sha = hashlib.sha256(payload).hexdigest() (tmp_path / sha).write_bytes(payload) sample = Sample( name="real-multi", family="X", category="rat", profile="bursty-c2", sha256=sha, ) cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") # Patch the fake to return "sha-ok" so the verify step passes. client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}}) client._verify_response = "sha-ok\n" real_read = client.session_shell_read def shell_read_with_verify(sid): # Return verify token after the finalize command — i.e. once # the most recent shell_write contained "sha256sum". last = client.shell_writes[-1][1] if client.shell_writes else "" if "sha256sum" in last: return "sha-ok\n" return real_read(sid) client.session_shell_read = shell_read_with_verify # type: ignore[assignment] events: list[tuple[str, dict]] = [] driver = MSFExploitDriver( client=client, # type: ignore[arg-type] module=cfg, cfg=DriverConfig( target_ip="10.200.0.10", session_open_timeout_s=0.5, sample_store_root=tmp_path, ), emit_event=lambda ev, **kw: events.append((ev, kw)), sample=sample, ) driver.setup() driver.set_phase("armed") driver.set_phase("infecting") driver.set_phase("infected_running") # All chunks + finalize + exec went through shell_write. writes = [w for (_, w) in client.shell_writes] n_printf = sum(1 for w in writes if w.startswith("printf '%s'")) n_finalize = sum(1 for w in writes if "sha256sum" in w) n_exec = sum(1 for w in writes if "nohup" in w and ".cis490-real" in w) assert n_printf >= 2, f"expected multiple chunks, saw {n_printf}" assert n_finalize == 1 assert n_exec == 1 # Events tell the same story. names = [e for (e, _) in events] assert "real_binary_upload_begin" in names assert "real_binary_verify" in names assert any(e == "sample_executed" and kw.get("kind") == "real" for (e, kw) in events) def test_driver_falls_back_to_mimic_when_real_binary_missing(tmp_path: Path) -> None: sample = Sample( name="real-but-missing", family="X", category="rat", profile="bursty-c2", sha256="c" * 64, ) cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}}) driver = MSFExploitDriver( client=client, # type: ignore[arg-type] module=cfg, cfg=DriverConfig( target_ip="10.200.0.10", session_open_timeout_s=0.5, sample_store_root=tmp_path, # empty ), emit_event=lambda *a, **kw: None, sample=sample, ) # Mimic workload selected because the binary isn't staged. assert driver.workload is not None assert driver.workload.profile == "bursty-c2" assert "real:" not in driver.workload.profile # --------------------------------------------------------------------------- # Fetcher input validation # --------------------------------------------------------------------------- def test_fetch_sample_rejects_bad_sha(tmp_path: Path) -> None: from tools.fetch_sample import fetch_sample with pytest.raises(ValueError, match="64 hex chars"): fetch_sample("not-a-hash", tmp_path, api_key="x") def test_fetch_sample_returns_existing_when_hash_matches(tmp_path: Path) -> None: from tools.fetch_sample import fetch_sample payload = b"already staged bytes" sha = hashlib.sha256(payload).hexdigest() p = tmp_path / sha p.write_bytes(payload) # api_key is unused on the cached path; pass anything. out = fetch_sample(sha, tmp_path, api_key="ignored") assert out == p # File untouched. assert p.read_bytes() == payload