"""Tier-3: real VM, real exploit, honest ``armed -> infecting`` transition. Boots the vulnerable target VM, drives an msfrpcd-fired exploit module against it, and lets the orchestrator's host /proc collector sample the qemu-system pid throughout. Compared to ``run_real_vm_demo.py``: the workload that crosses the ``armed -> infecting`` boundary is now generated by an actual exploit landing a session, not by a script in the guest. Prereqs: - vm/images/.qcow2 (e.g. Metasploitable2) - msfrpcd running locally: msfrpcd -P -U msf -a 127.0.0.1 -p 55553 - ``msgpack`` python package installed (added to runtime deps) Run: MSFRPC_PASSWORD= uv run python tools/run_tier3_demo.py \\ --module vsftpd_234_backdoor \\ --data-root data """ from __future__ import annotations import argparse import logging import os import signal import subprocess import sys import time from pathlib import Path # Allow running as a script. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from collectors import qmp # noqa: E402 from exploits.driver import DriverConfig, MSFExploitDriver # noqa: E402 from exploits.modules import load_module_config # noqa: E402 from exploits.msfrpc import MSFRpcClient, MSFRpcConfig # noqa: E402 from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402 from orchestrator.manifest import ( # noqa: E402 ManifestError, load_canonical, tier3_schedule_from, ) from samples.manifest import SampleManifest # noqa: E402 # Tier-3 schedule comes from the canonical manifest at episode-launch # time. Phase durations are budgets for the §4.5 event-driven labeller # (clean/armed orchestrator-emitted; infecting/infected_running gated # on exploit_fire / session_open events). Per-call lookup so a manifest # amendment takes effect on the next episode without a service # restart. def _wait_for_path(path: Path, timeout_s: float) -> None: deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: if path.exists() and path.read_text().strip(): return time.sleep(0.2) raise TimeoutError(f"{path} never appeared within {timeout_s}s") def _wait_for_tcp(host: str, port: int, timeout_s: float) -> None: """Probe a TCP port and block until a connection is accepted and the remote service is waiting for client input (recv timeout = success). SLIRP completes the TCP handshake before the guest OS boots, making a bare ``connect()`` unreliable. However, after the guest kernel and TCP stack are up, SLIRP forwards the connection to the guest. If the port is not yet open (service not started), the guest RSTs → OSError → retry. Once the service is listening and waiting for the client to speak first (e.g. Samba on 139), ``recv`` times out → we return. To avoid false-positives during early boot (before the guest TCP stack is running), callers should enforce a minimum wall-clock wait after QEMU starts before calling this function. With a 65 s floor, Metasploitable2's kernel and init are always up by the time we probe. """ import socket deadline = time.monotonic() + timeout_s last_err: Exception | None = None while time.monotonic() < deadline: try: with socket.create_connection((host, port), timeout=2.0) as s: # recv with a generous timeout. Three outcomes: # socket.timeout → service is up, waiting for client data ✓ # data received → service sent a banner; also up ✓ # OSError/reset → port closed; retry s.settimeout(3.0) try: data = s.recv(1) except socket.timeout: return # service alive, waiting for client data ✓ if data: return # banner received ✓ # b'' = connection closed by peer (service not ready) → retry last_err = OSError("connection closed by peer (service not ready)") time.sleep(1.0) continue except OSError as e: last_err = e time.sleep(1.0) raise TimeoutError( f"target service {host}:{port} not reachable within {timeout_s}s " f"(last: {last_err})" ) # Metasploitable2 takes ~50-70 s to boot fully under normal load. # SLIRP accepts TCP connections before the guest TCP stack is up, so we # must wait at least this long before _wait_for_tcp will give a reliable # signal. 65 s is a safe floor; the boot-timeout arg covers the rest. _METASPLOITABLE2_MIN_BOOT_S: float = 65.0 def main() -> int: parser = argparse.ArgumentParser(prog="run_tier3_demo") parser.add_argument("--data-root", default="data") parser.add_argument("--interval-ms", type=int, default=100) parser.add_argument( "--module", default="vsftpd_234_backdoor", help="Module config name in exploits/modules/.toml", ) parser.add_argument( "--target-ip", default="127.0.0.1", help="Address the exploit module sets RHOSTS to. With the SLIRP " "launcher (default), the guest's vulnerable port is hostfwd'd to " "loopback; on a host-only bridge, this is the guest's bridge IP.", ) parser.add_argument( "--target-port", type=int, default=21, help="Probe port to wait on before firing the exploit", ) parser.add_argument( "--run-dir", # Per-slot defaults so the fleet runner's parallel calls don't # collide on the same /tmp dir. See run_real_vm_demo.py for # the same fix. default=( os.environ.get("RUN_DIR") or f"/tmp/cis490-target-{os.environ.get('SLOT', '0')}" ), help="QEMU run dir (sockets + pidfile)", ) parser.add_argument( "--msfrpc-host", default=os.environ.get("MSFRPC_HOST", "127.0.0.1"), ) parser.add_argument( "--msfrpc-port", type=int, default=int(os.environ.get("MSFRPC_PORT", "55553")), ) parser.add_argument( "--msfrpc-user", default=os.environ.get("MSFRPC_USER", "msf"), ) parser.add_argument( "--keep-vm", action="store_true", help="leave the VM running after the episode finishes", ) parser.add_argument( "--target-boot-timeout", type=float, default=180.0, help="how long to wait for the guest's vulnerable service to listen", ) parser.add_argument( "--sample", default=os.environ.get("SAMPLE_NAME"), help="Pick a workload profile from the samples manifest by name. " "Fleet runner passes this via SAMPLE_NAME env. Without it, falls " "back to the v1 yes-loop.", ) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger("cis490.run_tier3_demo") msfrpc_password = os.environ.get("MSFRPC_PASSWORD") if not msfrpc_password: log.error("MSFRPC_PASSWORD env var must be set") return 2 repo_root = Path(__file__).resolve().parent.parent launcher = repo_root / "vm" / "launch_target.sh" modules_dir = repo_root / "exploits" / "modules" module_path = modules_dir / f"{args.module}.toml" if not module_path.exists(): log.error("no module config at %s", module_path) return 2 # Canonical experiment manifest (PIPELINE.md §4.1). try: experiment = load_canonical(repo_root) except ManifestError as e: log.error("canonical manifest failed to load: %s", e) return 78 samples_path = repo_root / experiment.samples_manifest_path module = load_module_config(module_path) log.info("module loaded: %s (%s)", module.name, module.module_path) sample = None if args.sample: samples = SampleManifest.load(samples_path) sample = next((s for s in samples.samples if s.name == args.sample), None) if sample is None: log.error("sample %r not in samples manifest %s", args.sample, samples_path) return 2 log.info("sample=%s profile=%s kind=%s", sample.name, sample.profile, sample.kind) run_dir = Path(args.run_dir) # Kill any QEMU still holding this slot's run_dir from a previous wave. # QEMU is started with start_new_session=True so it survives orchestrator # SIGTERM without explicit cleanup here. old_pid_file = run_dir / "qemu.pid" if old_pid_file.exists(): try: old_pid = int(old_pid_file.read_text().strip()) import os as _os _os.killpg(_os.getpgid(old_pid), signal.SIGTERM) time.sleep(1.5) except (ProcessLookupError, ValueError, OSError): pass if run_dir.exists(): import shutil shutil.rmtree(run_dir) run_dir.mkdir(parents=True, exist_ok=True) pid_file = run_dir / "qemu.pid" log.info("booting target VM via %s (RUN_DIR=%s)", launcher, run_dir) env = os.environ.copy() env["RUN_DIR"] = str(run_dir) qemu = subprocess.Popen( [str(launcher)], cwd=str(repo_root), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) try: _wait_for_path(pid_file, timeout_s=15.0) qemu_pid = int(pid_file.read_text().strip()) # Enforce a minimum boot floor before probing. SLIRP completes the # TCP handshake immediately even before the guest kernel loads, so # a bare connect() would always succeed. After the floor the guest # TCP stack is up and a recv-timeout from the probe means the # service is genuinely listening. qemu_start = time.monotonic() elapsed = qemu_start - qemu_start # 0: we just got the pidfile boot_floor = _METASPLOITABLE2_MIN_BOOT_S if boot_floor > 0: log.info("qemu pid = %d; waiting %.0fs for target OS to boot before probing", qemu_pid, boot_floor) time.sleep(boot_floor) remaining = max(0.0, args.target_boot_timeout - boot_floor) log.info("probing %s:%d (up to %.0fs remaining)", args.target_ip, args.target_port, remaining) _wait_for_tcp(args.target_ip, args.target_port, timeout_s=remaining) log.info("target service is up") # Pre-exploit savevm so EpisodeConfig.revert_at_{start,end} # has a known-good baseline to load. Best-effort — we still # run the episode if savevm fails (just without revert # support). See run_real_vm_demo.py for the same pattern. qmp_sock = run_dir / "qmp.sock" if qmp_sock.exists(): try: _qmp = qmp.QMPClient(qmp_sock) _qmp.connect() try: out = _qmp.savevm("baseline-v1") log.info("savevm baseline-v1 OK: %s", out.strip()[:160]) finally: _qmp.close() except Exception as e: log.warning("savevm failed; revert_at_start unusable: %s", e) client = MSFRpcClient( MSFRpcConfig( host=args.msfrpc_host, port=args.msfrpc_port, user=args.msfrpc_user, password=msfrpc_password, ) ) # Wire the same collector sockets the Tier-2 path wires. Without # these, EpisodeConfig defaults to None and the qmp / guest-agent # / pcap collectors never start — even though launch_target.sh # creates the qmp.sock + agent.sock chardevs and BRIDGE supplies # the iface. Refs PIPELINE.md §4.4: a collector that appears # configured but emits zero rows is exactly the silent-downgrade # pattern §1 forbids. agent_sock = run_dir / "agent.sock" schedule = tier3_schedule_from(experiment.schedule) cfg = EpisodeConfig( target_pid=qemu_pid, duration_s=sum(d for _, d in schedule), interval_ms=args.interval_ms, data_root=Path(args.data_root), phase_schedule=schedule, image_name=module.name + "-target", snapshot_name="baseline-v1", qmp_socket=qmp_sock if qmp_sock.exists() else None, guest_agent_socket=agent_sock if agent_sock.exists() else None, bridge_iface=os.environ.get("BRIDGE") or None, experiment_meta=experiment.to_meta(), # Source 3 (oracle): see equivalent in run_real_vm_demo.py. # Now that the perf collector parser is fixed, turn it on # in production rather than leave it silently disabled. enable_perf=True, sample=sample, exploit_meta={ "framework": "metasploit", "module": module.module_path, "module_type": module.module_type, "module_name": module.name, "payload": module.payload_path, "rport": module.options.get("RPORT"), "rhost_template": module.options.get("RHOSTS"), }, ) runner = EpisodeRunner(cfg) driver = MSFExploitDriver( client=client, module=module, cfg=DriverConfig( target_ip=args.target_ip, # Override RPORT when target_port is an unprivileged host port # (i.e. fleet runner remapped the guest's privileged port to a # loopback port > 1024). When target_port == module RPORT the # caller wants direct guest access; leave RPORT unchanged. target_port=args.target_port if args.target_port > 1024 else None, sample_store_root=repo_root / "samples" / "store", ), emit_event=runner.emit_event, sample=sample, ) runner.on_phase = driver.set_phase driver.setup() try: result = runner.run() finally: driver.teardown() print() print(f"episode_id = {result.episode_id}") print(f"path = {result.episode_dir}") print(f"rows_proc = {result.rows_proc}") print(f"phases = {result.phases_observed}") print(f"module = {module.module_path}") print() print("To plot:") print(f" uv run python tools/plot_envelope.py {result.episode_dir}") return 0 finally: if not args.keep_vm: log.info("shutting down VM (pid=%d)", qemu.pid) try: os.killpg(os.getpgid(qemu.pid), signal.SIGTERM) except ProcessLookupError: pass try: qemu.wait(timeout=5) except subprocess.TimeoutExpired: os.killpg(os.getpgid(qemu.pid), signal.SIGKILL) if __name__ == "__main__": sys.exit(main())