CIS490/exploits/driver.py
max 613c6fa223 Tier 3: msfrpc-driven exploit driver + first module config
Adds the Tier-3 exploit driver — an MSFExploitDriver that plugs into
EpisodeRunner.on_phase, fires a Metasploit module against a target VM
via msfrpcd, watches for the resulting session, and stamps each
transition (exploit_fire, session_open, session_landing_probe,
sample_executed, session_dormant, session_killed) into the episode's
events.jsonl on the orchestrator's monotonic clock.

What landed:
- exploits/msfrpc.py — minimal msgpack-over-HTTPS client (auth,
  module.execute, job/session lifecycle) so we don't depend on a
  third-party MSF wrapper.
- exploits/driver.py — phase-to-msfrpc adapter; idempotent fire,
  session-open polling with timeout, workload start/stop, teardown.
- exploits/modules.py + exploits/modules/vsftpd_234_backdoor.toml —
  TOML module configs with {{ target_ip }} placeholders, replacing the
  imperative .rc-script approach the README previously hinted at.
- vm/launch_target.sh — SLIRP+restrict=on launcher for the
  intentionally-vulnerable target VM (host can reach guest via
  hostfwd, guest cannot reach host or internet).
- tools/run_tier3_demo.py — end-to-end runner mirroring run_real_vm_demo.
- tests/test_exploits.py — 12 new tests against a fake MSFRpcClient,
  including an integration test that drives a real EpisodeRunner.

Plumbing changes:
- EpisodeRunner._emit_event → public emit_event, so external drivers
  share the runner's monotonic clock and events.jsonl.
- mkdir for episode_dir moved to __init__ so emit_event is callable
  before run() (driver_setup fires pre-schedule).

Status: driver + tests pass (40/40); end-to-end against a live msfrpcd
+ Metasploitable2 image is the next bring-up step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 23:11:52 -05:00

205 lines
7.2 KiB
Python

"""Tier-3 exploit driver.
Plugged into ``EpisodeRunner`` as the ``on_phase`` callback. Translates
the closed phase enum into msfrpc actions:
clean — idle. (no-op; exploit hasn't fired yet)
armed — module loaded + options applied; module fires
with ``module.execute``. Driver records the fire
timestamp via ``emit_event`` so the labeler can
align ``armed`` with what's actually happening.
infecting — poll for a new session; on session_open, run a
one-shot landing command (``id`` or similar) so
we have a clear "session is responsive" event.
infected_running — start observable workload inside the session.
dormant — kill the workload, leave the session alive.
reverting — kill session, snapshot revert handled by caller.
The events the driver writes match the schema in ``docs/data-model.md``:
``exploit_fire``, ``session_open``, ``sample_executed``, ``session_dormant``,
``session_killed``.
The driver does NOT author exploits or pick payloads at runtime — those
choices live in ``exploits/modules/*.toml``. The driver is a pure
adapter between the phase machine and msfrpc.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Callable
from .modules import ModuleConfig
from .msfrpc import MSFRpcClient, wait_for_new_session
log = logging.getLogger("cis490.exploits.driver")
EmitEvent = Callable[..., None]
@dataclass
class DriverConfig:
target_ip: str
session_open_timeout_s: float = 30.0
# Workload command used to mimic XMRig-class infected_running shape
# in a real session. Kept simple on purpose — anything observable
# from outside the guest works for the dataset; we'll drop in a
# real sample at Tier 4.
workload_cmd: str = "yes > /dev/null"
# How we kill the workload at dormant time.
workload_kill_cmd: str = "pkill yes; true"
class MSFExploitDriver:
"""Phase-to-msfrpc adapter. One instance per episode."""
def __init__(
self,
client: MSFRpcClient,
module: ModuleConfig,
cfg: DriverConfig,
emit_event: EmitEvent,
) -> None:
self.client = client
self.module = module
self.cfg = cfg
self.emit = emit_event
self._sessions_seen_at_arm: set[int] = set()
self._session_id: int | None = None
self._job_id: int | str | None = None
self._fired = False
# ---- lifecycle ------------------------------------------------------
def setup(self) -> None:
"""Authenticate and snapshot the pre-existing session set so we
can recognize a *new* session as the one we just opened."""
self.client.login()
self._sessions_seen_at_arm = set(self.client.session_list().keys())
self.emit(
"driver_setup",
module=self.module.module_path,
payload=self.module.payload_path,
target_ip=self.cfg.target_ip,
preexisting_sessions=sorted(self._sessions_seen_at_arm),
)
def teardown(self) -> None:
if self._session_id is not None:
try:
self.client.session_stop(self._session_id)
self.emit("session_killed", session_id=self._session_id)
except Exception:
log.exception("session.stop on %s", self._session_id)
if self._job_id is not None:
try:
self.client.job_stop(self._job_id)
except Exception:
log.debug("job.stop on %s (often already gone)", self._job_id)
self.client.logout()
# ---- phase callback -------------------------------------------------
def set_phase(self, phase: str) -> None:
log.info("driver phase -> %s", phase)
if phase == "clean":
return
if phase == "armed":
self._fire()
elif phase == "infecting":
self._await_session()
elif phase == "infected_running":
self._start_workload()
elif phase == "dormant":
self._stop_workload()
elif phase == "reverting":
self.teardown()
else:
log.warning("unknown phase: %s", phase)
# ---- actions --------------------------------------------------------
def _fire(self) -> None:
if self._fired:
log.debug("module already fired; skipping re-fire")
return
opts = self.module.render_options(target_ip=self.cfg.target_ip)
self.emit(
"exploit_fire",
module=self.module.module_path,
options={k: v for k, v in opts.items() if k != "PASSWORD"},
)
resp = self.client.module_execute(
self.module.module_type, self.module.module_path, opts,
)
self._job_id = resp.get("job_id")
self._fired = True
def _await_session(self) -> None:
if self._session_id is not None:
return
result = wait_for_new_session(
self.client,
seen=self._sessions_seen_at_arm,
timeout_s=self.cfg.session_open_timeout_s,
)
if result is None:
self.emit(
"session_open_timeout",
module=self.module.module_path,
timeout_s=self.cfg.session_open_timeout_s,
)
log.warning(
"no session opened within %.1fs", self.cfg.session_open_timeout_s,
)
return
sid, info = result
self._session_id = sid
self.emit(
"session_open",
session_id=sid,
session_type=info.get("type"),
tunnel_peer=info.get("tunnel_peer"),
)
# Landing probe so we have a known-good RTT marker on the wire.
try:
self.client.session_shell_write(sid, "id")
time.sleep(0.5)
out = self.client.session_shell_read(sid)
self.emit("session_landing_probe", session_id=sid, output=out.strip()[:256])
except Exception:
log.exception("landing probe on session %s", sid)
def _start_workload(self) -> None:
if self._session_id is None:
log.warning("infected_running with no session — skipping workload")
return
self.client.session_shell_write(
self._session_id,
f"nohup sh -c {_shquote(self.cfg.workload_cmd)} </dev/null "
f">/dev/null 2>&1 & disown",
)
self.emit(
"sample_executed",
session_id=self._session_id,
command=self.cfg.workload_cmd,
)
def _stop_workload(self) -> None:
if self._session_id is None:
return
self.client.session_shell_write(
self._session_id, self.cfg.workload_kill_cmd,
)
self.emit("session_dormant", session_id=self._session_id)
def _shquote(s: str) -> str:
# Minimal POSIX single-quote escaping. The workload command is set
# by us, not by anything user-controlled, so we just need to handle
# embedded single quotes correctly for completeness.
return "'" + s.replace("'", "'\\''") + "'"