diff --git a/tests/test_tier3_local_verify.py b/tests/test_tier3_local_verify.py new file mode 100644 index 0000000..0419860 --- /dev/null +++ b/tests/test_tier3_local_verify.py @@ -0,0 +1,41 @@ +"""Pytest wrapper around tools/verify_tier3_local.py. + +The verifier is its own CLI for operators. The test wrapper just runs +the steps that are deterministic without external services so a +regression (e.g. someone breaking chunked_real_binary_upload) gets +caught by `pytest` without needing msfrpcd or a populated store. +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parent.parent +spec = importlib.util.spec_from_file_location( + "verify_tier3_local", REPO_ROOT / "tools" / "verify_tier3_local.py" +) +v = importlib.util.module_from_spec(spec) +sys.modules["verify_tier3_local"] = v +spec.loader.exec_module(v) + + +def test_module_configs_parse() -> None: + s = v.step_module_configs_parse() + assert s.outcome == "PASS", f"{s.name}: {s.detail}" + + +def test_manifest_distribution() -> None: + s = v.step_manifest_distribution() + assert s.outcome == "PASS", f"{s.name}: {s.detail}" + + +def test_chunked_upload_through_real_sh(tmp_path) -> None: + """The big one: chunked_real_binary_upload must survive a real + /bin/sh round-trip. This is what proves the Tier-4 binary upload + path will work against an msfrpc shell session — same wire shape, + same shell semantics.""" + s = v.step_chunked_upload_via_local_sh(tmp_path) + assert s.outcome == "PASS", f"{s.name}: {s.detail}" diff --git a/tools/verify_tier3_local.py b/tools/verify_tier3_local.py new file mode 100644 index 0000000..1a1c0bb --- /dev/null +++ b/tools/verify_tier3_local.py @@ -0,0 +1,417 @@ +"""``cis490-verify-tier3-local`` — verify Tier-3/4 components without +needing x86 KVM. + +The Pi is ARM64 — it can't boot Metasploitable2 with hardware +acceleration. But most of the Tier-3 chain doesn't need x86 at all: + + * msfrpcd is a Ruby daemon and runs natively on ARM + * the chunked binary upload is just shell commands over a pipe + * module configs, sample selection, manifest rewriting are pure Python + +This script exercises every component that DOES work on the Pi +end-to-end. What it cannot verify on the Pi is the actual exploit +fire against an x86 vulnerable target — that lives in the +``install-tier-3-4.sh`` verify step on a real lab host. + +Run on the Pi (or any host): + + /opt/cis490/.venv/bin/python tools/verify_tier3_local.py + +Exit codes: + 0 every step passed + 1 at least one MUST step failed + 2 prerequisites missing (msfrpcd not installed; that's OK, those + tests skip) + +Each step prints a one-line PASS/FAIL/SKIP summary, exit-on-first-fail +is OFF so you see all problems at once. +""" + +from __future__ import annotations + +import argparse +import hashlib +import io +import json +import logging +import os +import socket +import subprocess +import sys +import tarfile +import time +from pathlib import Path +from typing import Callable + + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + +from exploits.workloads import chunked_real_binary_upload, _wrap_loop # noqa: E402,F401 +from exploits.modules import load_module_configs, select_module # noqa: E402 +from samples.manifest import SampleManifest # noqa: E402 + + +log = logging.getLogger("cis490.verify_tier3_local") + + +# --------------------------------------------------------------------------- +# Pretty-printer +# --------------------------------------------------------------------------- + + +_GREEN, _YELLOW, _RED, _RESET = "\033[32m", "\033[33m", "\033[31m", "\033[0m" + + +class Step: + def __init__(self, name: str) -> None: + self.name = name + self.outcome: str | None = None + self.detail: str = "" + + def passed(self, detail: str = "") -> None: + self.outcome = "PASS" + self.detail = detail + + def failed(self, detail: str) -> None: + self.outcome = "FAIL" + self.detail = detail + + def skipped(self, detail: str) -> None: + self.outcome = "SKIP" + self.detail = detail + + def emit(self) -> None: + c = {"PASS": _GREEN, "FAIL": _RED, "SKIP": _YELLOW}.get(self.outcome or "", "") + print(f" {c}{self.outcome:<5}{_RESET} {self.name}") + if self.detail: + for line in self.detail.splitlines(): + print(f" {line}") + + +# --------------------------------------------------------------------------- +# Step 1 — chunked upload via local /bin/sh subprocess +# +# This is the most important on-Pi test. It proves the entire chunked- +# binary-upload chain works against a real shell, not a mock — every +# byte of base64, the heredoc-free printf chain, the sha256 verify, +# the chmod, the nohup-exec scaffold. If THIS works, the only +# remaining unknown for a Tier-4 ship is "does Metasploit's session +# accept the same shell commands" — and Metasploit shell sessions +# present a POSIX shell interface by design. +# --------------------------------------------------------------------------- + + +def step_chunked_upload_via_local_sh(work_dir: Path) -> Step: + s = Step("chunked binary upload survives a real /bin/sh round-trip") + binary_bytes = os.urandom(150_000) # 150 KB — exercises ~30 chunks + expected_sha = hashlib.sha256(binary_bytes).hexdigest() + + plan = chunked_real_binary_upload(binary_bytes) + + # Run /bin/sh with the work_dir as its CWD; rewrite /tmp paths in + # the plan to point inside work_dir so we don't litter the host. + bin_path = work_dir / "uploaded.bin" + pid_path = work_dir / "uploaded.pid" + b64_path = work_dir / "uploaded.b64" + rewritten = [] + for c in plan.chunks: + rewritten.append(c.replace(plan.bin_path, str(bin_path)) + .replace(plan.pid_path, str(pid_path)) + .replace(plan.b64_path if hasattr(plan, "b64_path") else "/tmp/", str(b64_path))) + # The plan internally references /tmp/.cis490-real-.b64; + # the simpler way is to grep+replace any bare /tmp/.cis490-real-real-binary + # path with a path under work_dir. We do it via the public fields. + rewritten = [c.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) + .replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) + .replace("/tmp/.cis490-real-real-binary.pid", str(pid_path)) + for c in plan.chunks] + finalize = plan.finalize_cmd.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) \ + .replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) \ + .replace("/tmp/.cis490-real-real-binary.pid", str(pid_path)) + + # Stream chunks into /bin/sh. + proc = subprocess.Popen( + ["/bin/sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, text=True, + ) + assert proc.stdin and proc.stdout + try: + for c in rewritten: + proc.stdin.write(c + "\n") + proc.stdin.write(finalize + "\n") + proc.stdin.write("echo __CIS490_FINALIZED__\n") + proc.stdin.flush() + # Wait for the marker. + out_lines: list[str] = [] + deadline = time.monotonic() + 30.0 + while time.monotonic() < deadline: + line = proc.stdout.readline() + if not line: + break + out_lines.append(line.rstrip()) + if "__CIS490_FINALIZED__" in line: + break + else: + s.failed("timed out waiting for finalize marker; sh output:\n" + + "\n".join(out_lines[-10:])) + return s + finally: + try: + proc.stdin.close() + proc.wait(timeout=2) + except Exception: + proc.kill() + + if not bin_path.exists(): + s.failed(f"binary not produced at {bin_path}; sh output tail:\n" + + "\n".join(out_lines[-10:])) + return s + actual_sha = hashlib.sha256(bin_path.read_bytes()).hexdigest() + if actual_sha != expected_sha: + s.failed(f"sha256 mismatch: expected {expected_sha[:12]}, got " + f"{actual_sha[:12]} ({bin_path.stat().st_size} bytes)") + return s + if "sha-ok" not in "\n".join(out_lines): + s.failed("finalize_cmd did not print 'sha-ok'; sh output tail:\n" + + "\n".join(out_lines[-10:])) + return s + s.passed(f"{plan.n_chunks} chunks, {len(binary_bytes):,} bytes, " + f"sha256={expected_sha[:12]}") + return s + + +# --------------------------------------------------------------------------- +# Step 2 — every exploit module's TOML parses + selector picks one +# --------------------------------------------------------------------------- + + +def step_module_configs_parse() -> Step: + s = Step("all exploits/modules/*.toml parse + selector returns a valid choice") + modules_dir = REPO_ROOT / "exploits" / "modules" + try: + catalog = load_module_configs(modules_dir) + except Exception as e: + s.failed(f"{modules_dir}: {e}") + return s + if not catalog: + s.failed(f"no TOML files in {modules_dir}") + return s + pick1 = select_module(catalog, host_id="lab1", slot=0, episode_index=0) + pick2 = select_module(catalog, host_id="lab1", slot=0, episode_index=0) + if pick1.name != pick2.name: + s.failed(f"selector non-deterministic: {pick1.name} vs {pick2.name}") + return s + s.passed(f"{len(catalog)} modules parsed; selector → {pick1.name}") + return s + + +# --------------------------------------------------------------------------- +# Step 3 — staged samples are the right architecture +# --------------------------------------------------------------------------- + + +def _is_linux_i386_elf(p: Path) -> bool: + try: + head = p.open("rb").read(20) + except OSError: + return False + if len(head) < 20 or head[:4] != b"\x7fELF": + return False + return (head[4] == 1 and head[5] == 1 and head[7] in (0, 3) + and int.from_bytes(head[18:20], "little") == 0x03) + + +def step_staged_samples_have_correct_arch(store_root: Path) -> Step: + s = Step("staged samples are Linux i386 ELF (executable on Metasploitable2)") + if not store_root.exists(): + s.skipped(f"{store_root} doesn't exist — run install-tier-3-4.sh first") + return s + # Only count sha256-named real samples — ignore .gitkeep and other + # housekeeping files. Real samples are 64-hex names (sha256 of the + # binary). + bins = sorted(p for p in store_root.iterdir() + if p.is_file() and len(p.name) == 64 + and all(c in "0123456789abcdef" for c in p.name)) + if not bins: + s.skipped(f"no sha256-named samples in {store_root} — run " + f"install-tier-3-4.sh on a lab host (or auto_fetch_samples.py " + f"locally) to populate") + return s + i386 = [p for p in bins if _is_linux_i386_elf(p)] + if not i386: + s.failed(f"{len(bins)} samples staged, none are Linux i386 ELF") + return s + if len(i386) < len(bins): + s.passed(f"{len(i386)}/{len(bins)} samples are Linux i386 ELF " + f"(others are best-effort fallback for theZoo dirs without i386 binaries)") + return s + s.passed(f"{len(bins)}/{len(bins)} samples are Linux i386 ELF") + return s + + +# --------------------------------------------------------------------------- +# Step 4 — manifest loads + sample selector spreads across families +# --------------------------------------------------------------------------- + + +def step_manifest_distribution() -> Step: + s = Step("manifest loads + selector covers every sample within 50 episodes") + manifest = SampleManifest.load(REPO_ROOT / "samples" / "manifest.toml") + seen: set[str] = set() + for slot in range(8): + for ep in range(50): + chosen = manifest.select(host_id="probe", slot=slot, episode_index=ep) + seen.add(chosen.name) + if len(seen) == len(manifest.samples): + s.passed(f"all {len(manifest.samples)} samples reachable; " + f"covered after {slot}*{ep}={slot * 50 + ep} picks") + return s + missing = [m.name for m in manifest.samples if m.name not in seen] + s.failed(f"after 8*50=400 picks, never selected: {missing}") + return s + + +# --------------------------------------------------------------------------- +# Step 5 — msfrpcd live probe (skips if not installed) +# --------------------------------------------------------------------------- + + +def step_msfrpcd_live_probe() -> Step: + s = Step("msfrpcd reachable on 127.0.0.1:55553 (skip if not installed)") + try: + with socket.create_connection(("127.0.0.1", 55553), timeout=1.0): + pass + except OSError: + s.skipped("msfrpcd not listening — install-msfrpcd.sh hasn't been run " + "on this host (expected for a Pi-only verifier)") + return s + + pw_file = Path("/etc/cis490/msfrpc.env") + if not pw_file.exists(): + s.failed("port open but /etc/cis490/msfrpc.env missing — partial install") + return s + pw = "" + for line in pw_file.read_text().splitlines(): + if line.startswith("MSFRPC_PASSWORD="): + pw = line.split("=", 1)[1] + break + if not pw: + s.failed(f"no MSFRPC_PASSWORD in {pw_file}") + return s + + # Round-trip an auth.login + core.version via the production client. + from exploits.msfrpc import MSFRpcClient, MSFRpcConfig + try: + c = MSFRpcClient(MSFRpcConfig( + host="127.0.0.1", port=55553, user="msf", password=pw, + )) + ver = c.call("core.version", []) + except Exception as e: + s.failed(f"msfrpc round-trip failed: {e}") + return s + s.passed(f"core.version → {ver.get('version', '?')} (modules ready)") + return s + + +# --------------------------------------------------------------------------- +# Step 6 — receiver gate path: rebuild a known good commit +# --------------------------------------------------------------------------- + + +def step_receiver_gate_responds() -> Step: + s = Step("receiver /v1/health responds + gate enforces commit allow-list") + try: + with socket.create_connection(("127.0.0.1", 8444), timeout=1.0): + pass + except OSError: + s.skipped("cis490-receiver not running on this host") + return s + import urllib.request + try: + with urllib.request.urlopen("http://127.0.0.1:8444/v1/health", timeout=2) as r: + if r.status != 200: + s.failed(f"/v1/health returned {r.status}") + return s + except Exception as e: + s.failed(f"/v1/health unreachable: {e}") + return s + # Send a PUT with bogus commit; expect 412. + req = urllib.request.Request( + "http://127.0.0.1:8444/v1/episodes/probe/01TESTNULL.tar.zst", + method="PUT", data=b"", + headers={ + "X-Content-SHA256": "0" * 64, + "X-Lab-Host": "probe", + "X-Cis490-Code-Commit": "0" * 40, + }, + ) + try: + urllib.request.urlopen(req, timeout=2) + s.failed("receiver accepted a bogus commit — gate is OFF") + except urllib.error.HTTPError as e: + if e.code == 412: + body = json.loads(e.read()) + head = body.get("head_commit", "?") + s.passed(f"gate rejected bogus commit; head={head[:12] if head else '?'}, " + f"window_size={body.get('valid_window_size')}") + else: + s.failed(f"unexpected status {e.code}: {e.read().decode()[:200]}") + return s + + +# --------------------------------------------------------------------------- +# Driver +# --------------------------------------------------------------------------- + + +def main() -> int: + p = argparse.ArgumentParser(prog="cis490-verify-tier3-local") + p.add_argument("--store-root", + default=str(REPO_ROOT / "samples" / "store"), + help="Where staged real samples live") + p.add_argument("--work-dir", + default="/tmp/cis490-verify-tier3", + help="Scratch dir for the chunked-upload test") + args = p.parse_args() + + logging.basicConfig(level=logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s %(message)s") + + work = Path(args.work_dir) + work.mkdir(parents=True, exist_ok=True) + + print("CIS490 Tier-3 local verifier — exercises every component that") + print("CAN run on this host without x86 KVM.") + print() + + steps: list[Step] = [] + steps.append(step_module_configs_parse()) + steps.append(step_manifest_distribution()) + steps.append(step_chunked_upload_via_local_sh(work)) + steps.append(step_staged_samples_have_correct_arch(Path(args.store_root))) + steps.append(step_msfrpcd_live_probe()) + steps.append(step_receiver_gate_responds()) + + for s in steps: + s.emit() + + failed = [s for s in steps if s.outcome == "FAIL"] + skipped = [s for s in steps if s.outcome == "SKIP"] + passed = [s for s in steps if s.outcome == "PASS"] + print() + print(f"summary: {len(passed)} passed, {len(failed)} failed, " + f"{len(skipped)} skipped (of {len(steps)} steps)") + if failed: + print() + print(f"{_RED}NOT all components verified.{_RESET} Tier-3 deploy will " + f"fail at the live-fire step on a lab host.") + return 1 + if skipped: + print(f"{_YELLOW}Some steps skipped (expected for a Pi-only verifier).{_RESET}") + print("Run on a lab host after install-tier-3-4.sh for full coverage.") + return 0 + + +if __name__ == "__main__": + sys.exit(main())