"""``cis490-verify-tier3-local`` — verify Tier-3/4 components without needing x86 KVM. The Pi is ARM64 — it can't boot Metasploitable2 with hardware acceleration. But most of the Tier-3 chain doesn't need x86 at all: * msfrpcd is a Ruby daemon and runs natively on ARM * the chunked binary upload is just shell commands over a pipe * module configs, sample selection, manifest rewriting are pure Python This script exercises every component that DOES work on the Pi end-to-end. What it cannot verify on the Pi is the actual exploit fire against an x86 vulnerable target — that lives in the ``install-tier-3-4.sh`` verify step on a real lab host. Run on the Pi (or any host): /opt/cis490/.venv/bin/python tools/verify_tier3_local.py Exit codes: 0 every step passed 1 at least one MUST step failed 2 prerequisites missing (msfrpcd not installed; that's OK, those tests skip) Each step prints a one-line PASS/FAIL/SKIP summary, exit-on-first-fail is OFF so you see all problems at once. """ from __future__ import annotations import argparse import hashlib import io import json import logging import os import socket import subprocess import sys import tarfile import time from pathlib import Path from typing import Callable REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) from exploits.workloads import chunked_real_binary_upload, _wrap_loop # noqa: E402,F401 from exploits.modules import load_module_configs, select_module # noqa: E402 from samples.manifest import SampleManifest # noqa: E402 log = logging.getLogger("cis490.verify_tier3_local") # --------------------------------------------------------------------------- # Pretty-printer # --------------------------------------------------------------------------- _GREEN, _YELLOW, _RED, _RESET = "\033[32m", "\033[33m", "\033[31m", "\033[0m" class Step: def __init__(self, name: str) -> None: self.name = name self.outcome: str | None = None self.detail: str = "" def passed(self, detail: str = "") -> None: self.outcome = "PASS" self.detail = detail def failed(self, detail: str) -> None: self.outcome = "FAIL" self.detail = detail def skipped(self, detail: str) -> None: self.outcome = "SKIP" self.detail = detail def emit(self) -> None: c = {"PASS": _GREEN, "FAIL": _RED, "SKIP": _YELLOW}.get(self.outcome or "", "") print(f" {c}{self.outcome:<5}{_RESET} {self.name}") if self.detail: for line in self.detail.splitlines(): print(f" {line}") # --------------------------------------------------------------------------- # Step 1 — chunked upload via local /bin/sh subprocess # # This is the most important on-Pi test. It proves the entire chunked- # binary-upload chain works against a real shell, not a mock — every # byte of base64, the heredoc-free printf chain, the sha256 verify, # the chmod, the nohup-exec scaffold. If THIS works, the only # remaining unknown for a Tier-4 ship is "does Metasploit's session # accept the same shell commands" — and Metasploit shell sessions # present a POSIX shell interface by design. # --------------------------------------------------------------------------- def step_chunked_upload_via_local_sh(work_dir: Path) -> Step: s = Step("chunked binary upload survives a real /bin/sh round-trip") binary_bytes = os.urandom(150_000) # 150 KB — exercises ~30 chunks expected_sha = hashlib.sha256(binary_bytes).hexdigest() plan = chunked_real_binary_upload(binary_bytes) # Run /bin/sh with the work_dir as its CWD; rewrite /tmp paths in # the plan to point inside work_dir so we don't litter the host. bin_path = work_dir / "uploaded.bin" pid_path = work_dir / "uploaded.pid" b64_path = work_dir / "uploaded.b64" rewritten = [] for c in plan.chunks: rewritten.append(c.replace(plan.bin_path, str(bin_path)) .replace(plan.pid_path, str(pid_path)) .replace(plan.b64_path if hasattr(plan, "b64_path") else "/tmp/", str(b64_path))) # The plan internally references /tmp/.cis490-real-.b64; # the simpler way is to grep+replace any bare /tmp/.cis490-real-real-binary # path with a path under work_dir. We do it via the public fields. rewritten = [c.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) .replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) .replace("/tmp/.cis490-real-real-binary.pid", str(pid_path)) for c in plan.chunks] finalize = plan.finalize_cmd.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) \ .replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) \ .replace("/tmp/.cis490-real-real-binary.pid", str(pid_path)) # Stream chunks into /bin/sh. proc = subprocess.Popen( ["/bin/sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) assert proc.stdin and proc.stdout try: for c in rewritten: proc.stdin.write(c + "\n") proc.stdin.write(finalize + "\n") proc.stdin.write("echo __CIS490_FINALIZED__\n") proc.stdin.flush() # Wait for the marker. out_lines: list[str] = [] deadline = time.monotonic() + 30.0 while time.monotonic() < deadline: line = proc.stdout.readline() if not line: break out_lines.append(line.rstrip()) if "__CIS490_FINALIZED__" in line: break else: s.failed("timed out waiting for finalize marker; sh output:\n" + "\n".join(out_lines[-10:])) return s finally: try: proc.stdin.close() proc.wait(timeout=2) except Exception: proc.kill() if not bin_path.exists(): s.failed(f"binary not produced at {bin_path}; sh output tail:\n" + "\n".join(out_lines[-10:])) return s actual_sha = hashlib.sha256(bin_path.read_bytes()).hexdigest() if actual_sha != expected_sha: s.failed(f"sha256 mismatch: expected {expected_sha[:12]}, got " f"{actual_sha[:12]} ({bin_path.stat().st_size} bytes)") return s if "sha-ok" not in "\n".join(out_lines): s.failed("finalize_cmd did not print 'sha-ok'; sh output tail:\n" + "\n".join(out_lines[-10:])) return s s.passed(f"{plan.n_chunks} chunks, {len(binary_bytes):,} bytes, " f"sha256={expected_sha[:12]}") return s # --------------------------------------------------------------------------- # Step 2 — every exploit module's TOML parses + selector picks one # --------------------------------------------------------------------------- def step_module_configs_parse() -> Step: s = Step("all exploits/modules/*.toml parse + selector returns a valid choice") modules_dir = REPO_ROOT / "exploits" / "modules" try: catalog = load_module_configs(modules_dir) except Exception as e: s.failed(f"{modules_dir}: {e}") return s if not catalog: s.failed(f"no TOML files in {modules_dir}") return s pick1 = select_module(catalog, host_id="lab1", slot=0, episode_index=0) pick2 = select_module(catalog, host_id="lab1", slot=0, episode_index=0) if pick1.name != pick2.name: s.failed(f"selector non-deterministic: {pick1.name} vs {pick2.name}") return s s.passed(f"{len(catalog)} modules parsed; selector → {pick1.name}") return s # --------------------------------------------------------------------------- # Step 3 — staged samples are the right architecture # --------------------------------------------------------------------------- def _is_linux_i386_elf(p: Path) -> bool: try: head = p.open("rb").read(20) except OSError: return False if len(head) < 20 or head[:4] != b"\x7fELF": return False return (head[4] == 1 and head[5] == 1 and head[7] in (0, 3) and int.from_bytes(head[18:20], "little") == 0x03) def step_staged_samples_have_correct_arch(store_root: Path) -> Step: s = Step("staged samples are Linux i386 ELF (executable on Metasploitable2)") if not store_root.exists(): s.skipped(f"{store_root} doesn't exist — run install-tier-3-4.sh first") return s # Only count sha256-named real samples — ignore .gitkeep and other # housekeeping files. Real samples are 64-hex names (sha256 of the # binary). bins = sorted(p for p in store_root.iterdir() if p.is_file() and len(p.name) == 64 and all(c in "0123456789abcdef" for c in p.name)) if not bins: s.skipped(f"no sha256-named samples in {store_root} — run " f"install-tier-3-4.sh on a lab host (or auto_fetch_samples.py " f"locally) to populate") return s i386 = [p for p in bins if _is_linux_i386_elf(p)] if not i386: s.failed(f"{len(bins)} samples staged, none are Linux i386 ELF") return s if len(i386) < len(bins): s.passed(f"{len(i386)}/{len(bins)} samples are Linux i386 ELF " f"(others are best-effort fallback for theZoo dirs without i386 binaries)") return s s.passed(f"{len(bins)}/{len(bins)} samples are Linux i386 ELF") return s # --------------------------------------------------------------------------- # Step 4 — manifest loads + sample selector spreads across families # --------------------------------------------------------------------------- def step_manifest_distribution() -> Step: s = Step("manifest loads + selector covers every sample within 50 episodes") manifest = SampleManifest.load(REPO_ROOT / "samples" / "manifest.toml") seen: set[str] = set() for slot in range(8): for ep in range(50): chosen = manifest.select(host_id="probe", slot=slot, episode_index=ep) seen.add(chosen.name) if len(seen) == len(manifest.samples): s.passed(f"all {len(manifest.samples)} samples reachable; " f"covered after {slot}*{ep}={slot * 50 + ep} picks") return s missing = [m.name for m in manifest.samples if m.name not in seen] s.failed(f"after 8*50=400 picks, never selected: {missing}") return s # --------------------------------------------------------------------------- # Step 5 — msfrpcd live probe (skips if not installed) # --------------------------------------------------------------------------- def step_msfrpcd_live_probe() -> Step: s = Step("msfrpcd reachable on 127.0.0.1:55553 (skip if not installed)") try: with socket.create_connection(("127.0.0.1", 55553), timeout=1.0): pass except OSError: s.skipped("msfrpcd not listening — install-msfrpcd.sh hasn't been run " "on this host (expected for a Pi-only verifier)") return s pw_file = Path("/etc/cis490/msfrpc.env") if not pw_file.exists(): s.failed("port open but /etc/cis490/msfrpc.env missing — partial install") return s pw = "" for line in pw_file.read_text().splitlines(): if line.startswith("MSFRPC_PASSWORD="): pw = line.split("=", 1)[1] break if not pw: s.failed(f"no MSFRPC_PASSWORD in {pw_file}") return s # Round-trip an auth.login + core.version via the production client. from exploits.msfrpc import MSFRpcClient, MSFRpcConfig try: c = MSFRpcClient(MSFRpcConfig( host="127.0.0.1", port=55553, user="msf", password=pw, )) ver = c.call("core.version", []) except Exception as e: s.failed(f"msfrpc round-trip failed: {e}") return s s.passed(f"core.version → {ver.get('version', '?')} (modules ready)") return s # --------------------------------------------------------------------------- # Step 6 — receiver gate path: rebuild a known good commit # --------------------------------------------------------------------------- def step_receiver_gate_responds() -> Step: s = Step("receiver /v1/health responds + gate enforces commit allow-list") try: with socket.create_connection(("127.0.0.1", 8444), timeout=1.0): pass except OSError: s.skipped("cis490-receiver not running on this host") return s import urllib.request try: with urllib.request.urlopen("http://127.0.0.1:8444/v1/health", timeout=2) as r: if r.status != 200: s.failed(f"/v1/health returned {r.status}") return s except Exception as e: s.failed(f"/v1/health unreachable: {e}") return s # Send a PUT with bogus commit; expect 412. req = urllib.request.Request( "http://127.0.0.1:8444/v1/episodes/probe/01TESTNULL.tar.zst", method="PUT", data=b"", headers={ "X-Content-SHA256": "0" * 64, "X-Lab-Host": "probe", "X-Cis490-Code-Commit": "0" * 40, }, ) try: urllib.request.urlopen(req, timeout=2) s.failed("receiver accepted a bogus commit — gate is OFF") except urllib.error.HTTPError as e: if e.code == 412: body = json.loads(e.read()) head = body.get("head_commit", "?") s.passed(f"gate rejected bogus commit; head={head[:12] if head else '?'}, " f"window_size={body.get('valid_window_size')}") else: s.failed(f"unexpected status {e.code}: {e.read().decode()[:200]}") return s # --------------------------------------------------------------------------- # Driver # --------------------------------------------------------------------------- def main() -> int: p = argparse.ArgumentParser(prog="cis490-verify-tier3-local") p.add_argument("--store-root", default=str(REPO_ROOT / "samples" / "store"), help="Where staged real samples live") p.add_argument("--work-dir", default="/tmp/cis490-verify-tier3", help="Scratch dir for the chunked-upload test") args = p.parse_args() logging.basicConfig(level=logging.WARNING, format="%(asctime)s %(levelname)s %(name)s %(message)s") work = Path(args.work_dir) work.mkdir(parents=True, exist_ok=True) print("CIS490 Tier-3 local verifier — exercises every component that") print("CAN run on this host without x86 KVM.") print() steps: list[Step] = [] steps.append(step_module_configs_parse()) steps.append(step_manifest_distribution()) steps.append(step_chunked_upload_via_local_sh(work)) steps.append(step_staged_samples_have_correct_arch(Path(args.store_root))) steps.append(step_msfrpcd_live_probe()) steps.append(step_receiver_gate_responds()) for s in steps: s.emit() failed = [s for s in steps if s.outcome == "FAIL"] skipped = [s for s in steps if s.outcome == "SKIP"] passed = [s for s in steps if s.outcome == "PASS"] print() print(f"summary: {len(passed)} passed, {len(failed)} failed, " f"{len(skipped)} skipped (of {len(steps)} steps)") if failed: print() print(f"{_RED}NOT all components verified.{_RESET} Tier-3 deploy will " f"fail at the live-fire step on a lab host.") return 1 if skipped: print(f"{_YELLOW}Some steps skipped (expected for a Pi-only verifier).{_RESET}") print("Run on a lab host after install-tier-3-4.sh for full coverage.") return 0 if __name__ == "__main__": sys.exit(main())