tools/verify_tier3_local.py: Pi-runnable Tier-3 verifier

Closes the "have you tested it" gap as much as we can without x86 KVM.
The Pi is ARM64 — can't boot Metasploitable2 or run KVM-accelerated
guests. But most of the Tier-3 chain doesn't need x86:

  * chunked_real_binary_upload is just shell commands over a pipe
  * exploit module TOMLs and the deterministic selector are pure Python
  * manifest loading + sample selection are pure Python
  * msfrpcd itself runs on ARM (Ruby + Java)
  * the receiver's commit gate is the same on any arch

verify_tier3_local.py exercises each of those end-to-end, in process,
on this Pi:

  PASS  exploits/modules/*.toml parse + selector deterministic
  PASS  manifest loads + selector covers every sample
  PASS  chunked binary upload survives a real /bin/sh round-trip
        (150 KB binary, 26 chunks, sha256-verified end to end)
  PASS  staged samples are Linux i386 ELF (when staged)
  PASS  msfrpcd round-trips core.version (when listening)
  PASS  receiver /v1/health + gate enforces commit allow-list

Live result on this Pi today: 5 PASS, 1 SKIP (msfrpcd not installed
on the Pi, which is correct — the Pi is the receiver, not a lab
host). When run on a lab host after install-tier-3-4.sh, all 6
PASS gives full Tier-3 readiness.

What this script does NOT verify (still needs x86 KVM on a lab
host, covered by install-tier-3-4.sh's verify step):

  * Metasploitable2 boots under QEMU/KVM
  * vsftpd_234_backdoor lands a session against it
  * the chunked-upload binary actually executes inside that session

But the chunked-upload step proves every byte of the upload path
(printf '%s', heredoc-free path, base64 decode, sha256 verify,
chmod, exec scaffold) works against a real POSIX shell. An msfrpc
session presents the same shell interface, so a passing local-sh
test is strong evidence the production path will work.

tests/test_tier3_local_verify.py wraps the deterministic steps
(module parse, manifest, chunked upload) so pytest catches
regressions automatically. 174/174 total.

Operator workflow: ssh into Pi (or lab host), run:
  /opt/cis490/.venv/bin/python tools/verify_tier3_local.py
Each step prints PASS/FAIL/SKIP with detail. Exit 1 if any FAIL.
This commit is contained in:
max 2026-05-01 03:41:21 -05:00
parent b809e1e26e
commit e2bb76144f
2 changed files with 458 additions and 0 deletions

View file

@ -0,0 +1,41 @@
"""Pytest wrapper around tools/verify_tier3_local.py.
The verifier is its own CLI for operators. The test wrapper just runs
the steps that are deterministic without external services so a
regression (e.g. someone breaking chunked_real_binary_upload) gets
caught by `pytest` without needing msfrpcd or a populated store.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"verify_tier3_local", REPO_ROOT / "tools" / "verify_tier3_local.py"
)
v = importlib.util.module_from_spec(spec)
sys.modules["verify_tier3_local"] = v
spec.loader.exec_module(v)
def test_module_configs_parse() -> None:
s = v.step_module_configs_parse()
assert s.outcome == "PASS", f"{s.name}: {s.detail}"
def test_manifest_distribution() -> None:
s = v.step_manifest_distribution()
assert s.outcome == "PASS", f"{s.name}: {s.detail}"
def test_chunked_upload_through_real_sh(tmp_path) -> None:
"""The big one: chunked_real_binary_upload must survive a real
/bin/sh round-trip. This is what proves the Tier-4 binary upload
path will work against an msfrpc shell session same wire shape,
same shell semantics."""
s = v.step_chunked_upload_via_local_sh(tmp_path)
assert s.outcome == "PASS", f"{s.name}: {s.detail}"

417
tools/verify_tier3_local.py Normal file
View file

@ -0,0 +1,417 @@
"""``cis490-verify-tier3-local`` — verify Tier-3/4 components without
needing x86 KVM.
The Pi is ARM64 it can't boot Metasploitable2 with hardware
acceleration. But most of the Tier-3 chain doesn't need x86 at all:
* msfrpcd is a Ruby daemon and runs natively on ARM
* the chunked binary upload is just shell commands over a pipe
* module configs, sample selection, manifest rewriting are pure Python
This script exercises every component that DOES work on the Pi
end-to-end. What it cannot verify on the Pi is the actual exploit
fire against an x86 vulnerable target that lives in the
``install-tier-3-4.sh`` verify step on a real lab host.
Run on the Pi (or any host):
/opt/cis490/.venv/bin/python tools/verify_tier3_local.py
Exit codes:
0 every step passed
1 at least one MUST step failed
2 prerequisites missing (msfrpcd not installed; that's OK, those
tests skip)
Each step prints a one-line PASS/FAIL/SKIP summary, exit-on-first-fail
is OFF so you see all problems at once.
"""
from __future__ import annotations
import argparse
import hashlib
import io
import json
import logging
import os
import socket
import subprocess
import sys
import tarfile
import time
from pathlib import Path
from typing import Callable
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from exploits.workloads import chunked_real_binary_upload, _wrap_loop # noqa: E402,F401
from exploits.modules import load_module_configs, select_module # noqa: E402
from samples.manifest import SampleManifest # noqa: E402
log = logging.getLogger("cis490.verify_tier3_local")
# ---------------------------------------------------------------------------
# Pretty-printer
# ---------------------------------------------------------------------------
_GREEN, _YELLOW, _RED, _RESET = "\033[32m", "\033[33m", "\033[31m", "\033[0m"
class Step:
def __init__(self, name: str) -> None:
self.name = name
self.outcome: str | None = None
self.detail: str = ""
def passed(self, detail: str = "") -> None:
self.outcome = "PASS"
self.detail = detail
def failed(self, detail: str) -> None:
self.outcome = "FAIL"
self.detail = detail
def skipped(self, detail: str) -> None:
self.outcome = "SKIP"
self.detail = detail
def emit(self) -> None:
c = {"PASS": _GREEN, "FAIL": _RED, "SKIP": _YELLOW}.get(self.outcome or "", "")
print(f" {c}{self.outcome:<5}{_RESET} {self.name}")
if self.detail:
for line in self.detail.splitlines():
print(f" {line}")
# ---------------------------------------------------------------------------
# Step 1 — chunked upload via local /bin/sh subprocess
#
# This is the most important on-Pi test. It proves the entire chunked-
# binary-upload chain works against a real shell, not a mock — every
# byte of base64, the heredoc-free printf chain, the sha256 verify,
# the chmod, the nohup-exec scaffold. If THIS works, the only
# remaining unknown for a Tier-4 ship is "does Metasploit's session
# accept the same shell commands" — and Metasploit shell sessions
# present a POSIX shell interface by design.
# ---------------------------------------------------------------------------
def step_chunked_upload_via_local_sh(work_dir: Path) -> Step:
s = Step("chunked binary upload survives a real /bin/sh round-trip")
binary_bytes = os.urandom(150_000) # 150 KB — exercises ~30 chunks
expected_sha = hashlib.sha256(binary_bytes).hexdigest()
plan = chunked_real_binary_upload(binary_bytes)
# Run /bin/sh with the work_dir as its CWD; rewrite /tmp paths in
# the plan to point inside work_dir so we don't litter the host.
bin_path = work_dir / "uploaded.bin"
pid_path = work_dir / "uploaded.pid"
b64_path = work_dir / "uploaded.b64"
rewritten = []
for c in plan.chunks:
rewritten.append(c.replace(plan.bin_path, str(bin_path))
.replace(plan.pid_path, str(pid_path))
.replace(plan.b64_path if hasattr(plan, "b64_path") else "/tmp/", str(b64_path)))
# The plan internally references /tmp/.cis490-real-<profile>.b64;
# the simpler way is to grep+replace any bare /tmp/.cis490-real-real-binary
# path with a path under work_dir. We do it via the public fields.
rewritten = [c.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path))
.replace("/tmp/.cis490-real-real-binary.bin", str(bin_path))
.replace("/tmp/.cis490-real-real-binary.pid", str(pid_path))
for c in plan.chunks]
finalize = plan.finalize_cmd.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) \
.replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) \
.replace("/tmp/.cis490-real-real-binary.pid", str(pid_path))
# Stream chunks into /bin/sh.
proc = subprocess.Popen(
["/bin/sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True,
)
assert proc.stdin and proc.stdout
try:
for c in rewritten:
proc.stdin.write(c + "\n")
proc.stdin.write(finalize + "\n")
proc.stdin.write("echo __CIS490_FINALIZED__\n")
proc.stdin.flush()
# Wait for the marker.
out_lines: list[str] = []
deadline = time.monotonic() + 30.0
while time.monotonic() < deadline:
line = proc.stdout.readline()
if not line:
break
out_lines.append(line.rstrip())
if "__CIS490_FINALIZED__" in line:
break
else:
s.failed("timed out waiting for finalize marker; sh output:\n"
+ "\n".join(out_lines[-10:]))
return s
finally:
try:
proc.stdin.close()
proc.wait(timeout=2)
except Exception:
proc.kill()
if not bin_path.exists():
s.failed(f"binary not produced at {bin_path}; sh output tail:\n"
+ "\n".join(out_lines[-10:]))
return s
actual_sha = hashlib.sha256(bin_path.read_bytes()).hexdigest()
if actual_sha != expected_sha:
s.failed(f"sha256 mismatch: expected {expected_sha[:12]}, got "
f"{actual_sha[:12]} ({bin_path.stat().st_size} bytes)")
return s
if "sha-ok" not in "\n".join(out_lines):
s.failed("finalize_cmd did not print 'sha-ok'; sh output tail:\n"
+ "\n".join(out_lines[-10:]))
return s
s.passed(f"{plan.n_chunks} chunks, {len(binary_bytes):,} bytes, "
f"sha256={expected_sha[:12]}")
return s
# ---------------------------------------------------------------------------
# Step 2 — every exploit module's TOML parses + selector picks one
# ---------------------------------------------------------------------------
def step_module_configs_parse() -> Step:
s = Step("all exploits/modules/*.toml parse + selector returns a valid choice")
modules_dir = REPO_ROOT / "exploits" / "modules"
try:
catalog = load_module_configs(modules_dir)
except Exception as e:
s.failed(f"{modules_dir}: {e}")
return s
if not catalog:
s.failed(f"no TOML files in {modules_dir}")
return s
pick1 = select_module(catalog, host_id="lab1", slot=0, episode_index=0)
pick2 = select_module(catalog, host_id="lab1", slot=0, episode_index=0)
if pick1.name != pick2.name:
s.failed(f"selector non-deterministic: {pick1.name} vs {pick2.name}")
return s
s.passed(f"{len(catalog)} modules parsed; selector → {pick1.name}")
return s
# ---------------------------------------------------------------------------
# Step 3 — staged samples are the right architecture
# ---------------------------------------------------------------------------
def _is_linux_i386_elf(p: Path) -> bool:
try:
head = p.open("rb").read(20)
except OSError:
return False
if len(head) < 20 or head[:4] != b"\x7fELF":
return False
return (head[4] == 1 and head[5] == 1 and head[7] in (0, 3)
and int.from_bytes(head[18:20], "little") == 0x03)
def step_staged_samples_have_correct_arch(store_root: Path) -> Step:
s = Step("staged samples are Linux i386 ELF (executable on Metasploitable2)")
if not store_root.exists():
s.skipped(f"{store_root} doesn't exist — run install-tier-3-4.sh first")
return s
# Only count sha256-named real samples — ignore .gitkeep and other
# housekeeping files. Real samples are 64-hex names (sha256 of the
# binary).
bins = sorted(p for p in store_root.iterdir()
if p.is_file() and len(p.name) == 64
and all(c in "0123456789abcdef" for c in p.name))
if not bins:
s.skipped(f"no sha256-named samples in {store_root} — run "
f"install-tier-3-4.sh on a lab host (or auto_fetch_samples.py "
f"locally) to populate")
return s
i386 = [p for p in bins if _is_linux_i386_elf(p)]
if not i386:
s.failed(f"{len(bins)} samples staged, none are Linux i386 ELF")
return s
if len(i386) < len(bins):
s.passed(f"{len(i386)}/{len(bins)} samples are Linux i386 ELF "
f"(others are best-effort fallback for theZoo dirs without i386 binaries)")
return s
s.passed(f"{len(bins)}/{len(bins)} samples are Linux i386 ELF")
return s
# ---------------------------------------------------------------------------
# Step 4 — manifest loads + sample selector spreads across families
# ---------------------------------------------------------------------------
def step_manifest_distribution() -> Step:
s = Step("manifest loads + selector covers every sample within 50 episodes")
manifest = SampleManifest.load(REPO_ROOT / "samples" / "manifest.toml")
seen: set[str] = set()
for slot in range(8):
for ep in range(50):
chosen = manifest.select(host_id="probe", slot=slot, episode_index=ep)
seen.add(chosen.name)
if len(seen) == len(manifest.samples):
s.passed(f"all {len(manifest.samples)} samples reachable; "
f"covered after {slot}*{ep}={slot * 50 + ep} picks")
return s
missing = [m.name for m in manifest.samples if m.name not in seen]
s.failed(f"after 8*50=400 picks, never selected: {missing}")
return s
# ---------------------------------------------------------------------------
# Step 5 — msfrpcd live probe (skips if not installed)
# ---------------------------------------------------------------------------
def step_msfrpcd_live_probe() -> Step:
s = Step("msfrpcd reachable on 127.0.0.1:55553 (skip if not installed)")
try:
with socket.create_connection(("127.0.0.1", 55553), timeout=1.0):
pass
except OSError:
s.skipped("msfrpcd not listening — install-msfrpcd.sh hasn't been run "
"on this host (expected for a Pi-only verifier)")
return s
pw_file = Path("/etc/cis490/msfrpc.env")
if not pw_file.exists():
s.failed("port open but /etc/cis490/msfrpc.env missing — partial install")
return s
pw = ""
for line in pw_file.read_text().splitlines():
if line.startswith("MSFRPC_PASSWORD="):
pw = line.split("=", 1)[1]
break
if not pw:
s.failed(f"no MSFRPC_PASSWORD in {pw_file}")
return s
# Round-trip an auth.login + core.version via the production client.
from exploits.msfrpc import MSFRpcClient, MSFRpcConfig
try:
c = MSFRpcClient(MSFRpcConfig(
host="127.0.0.1", port=55553, user="msf", password=pw,
))
ver = c.call("core.version", [])
except Exception as e:
s.failed(f"msfrpc round-trip failed: {e}")
return s
s.passed(f"core.version → {ver.get('version', '?')} (modules ready)")
return s
# ---------------------------------------------------------------------------
# Step 6 — receiver gate path: rebuild a known good commit
# ---------------------------------------------------------------------------
def step_receiver_gate_responds() -> Step:
s = Step("receiver /v1/health responds + gate enforces commit allow-list")
try:
with socket.create_connection(("127.0.0.1", 8444), timeout=1.0):
pass
except OSError:
s.skipped("cis490-receiver not running on this host")
return s
import urllib.request
try:
with urllib.request.urlopen("http://127.0.0.1:8444/v1/health", timeout=2) as r:
if r.status != 200:
s.failed(f"/v1/health returned {r.status}")
return s
except Exception as e:
s.failed(f"/v1/health unreachable: {e}")
return s
# Send a PUT with bogus commit; expect 412.
req = urllib.request.Request(
"http://127.0.0.1:8444/v1/episodes/probe/01TESTNULL.tar.zst",
method="PUT", data=b"",
headers={
"X-Content-SHA256": "0" * 64,
"X-Lab-Host": "probe",
"X-Cis490-Code-Commit": "0" * 40,
},
)
try:
urllib.request.urlopen(req, timeout=2)
s.failed("receiver accepted a bogus commit — gate is OFF")
except urllib.error.HTTPError as e:
if e.code == 412:
body = json.loads(e.read())
head = body.get("head_commit", "?")
s.passed(f"gate rejected bogus commit; head={head[:12] if head else '?'}, "
f"window_size={body.get('valid_window_size')}")
else:
s.failed(f"unexpected status {e.code}: {e.read().decode()[:200]}")
return s
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def main() -> int:
p = argparse.ArgumentParser(prog="cis490-verify-tier3-local")
p.add_argument("--store-root",
default=str(REPO_ROOT / "samples" / "store"),
help="Where staged real samples live")
p.add_argument("--work-dir",
default="/tmp/cis490-verify-tier3",
help="Scratch dir for the chunked-upload test")
args = p.parse_args()
logging.basicConfig(level=logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
work = Path(args.work_dir)
work.mkdir(parents=True, exist_ok=True)
print("CIS490 Tier-3 local verifier — exercises every component that")
print("CAN run on this host without x86 KVM.")
print()
steps: list[Step] = []
steps.append(step_module_configs_parse())
steps.append(step_manifest_distribution())
steps.append(step_chunked_upload_via_local_sh(work))
steps.append(step_staged_samples_have_correct_arch(Path(args.store_root)))
steps.append(step_msfrpcd_live_probe())
steps.append(step_receiver_gate_responds())
for s in steps:
s.emit()
failed = [s for s in steps if s.outcome == "FAIL"]
skipped = [s for s in steps if s.outcome == "SKIP"]
passed = [s for s in steps if s.outcome == "PASS"]
print()
print(f"summary: {len(passed)} passed, {len(failed)} failed, "
f"{len(skipped)} skipped (of {len(steps)} steps)")
if failed:
print()
print(f"{_RED}NOT all components verified.{_RESET} Tier-3 deploy will "
f"fail at the live-fire step on a lab host.")
return 1
if skipped:
print(f"{_YELLOW}Some steps skipped (expected for a Pi-only verifier).{_RESET}")
print("Run on a lab host after install-tier-3-4.sh for full coverage.")
return 0
if __name__ == "__main__":
sys.exit(main())