CIS490/exploits/msfrpc.py
Elliott Kolden b73f5559dc Tier-3 fixes: b'' probe false-positive, requires_bridge, msgpack
Bug 10: _wait_for_tcp returned on recv()→b'' (connection closed by peer),
falsely signalling service-ready. Only socket.timeout or non-empty data
are genuine ready signals; b'' now retries.

Bug 11: distccd_command_exec and unreal_ircd_3281_backdoor incorrectly
had requires_bridge=true. bind_perl payloads connect inward (host→guest
via hostfwd), not outward — no bridge egress needed. Both modules now
run on SLIRP-only fleet slots.

Bug 12: msgpack.unpackb crashed on integer session IDs from msfrpcd 6.x
(strict_map_key=True default). Added strict_map_key=False.

Bug 13 (documented): samba_usermap_script removed from catalog (NoReply
on every fire — already handled in dca6144 on origin/main).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 15:15:18 -06:00

254 lines
9.3 KiB
Python

"""Tiny Metasploit RPC client — just enough for the Tier-3 driver.
We talk msgpack over HTTPS to ``msfrpcd``. The full MSF RPC surface is
huge; this client implements only the verbs we actually call:
auth.login — get a token
auth.logout — release the token
module.execute — fire an exploit (or aux) module by name
job.list / job.stop — manage the running module
session.list — see opened sessions, find the one we just opened
session.shell_write/read — run commands in a shell session
session.stop — kill a session at episode end
Why not pull in pymetasploit3? Two reasons:
- msfrpcd's protocol is small enough that owning it removes a third-party
dep (and a maintenance risk on a course project).
- the parts we need (session opening, shell commands, job lifecycle)
are simple, and we want full visibility into what's on the wire when
debugging an exploit fire.
The client is intentionally synchronous; the Tier-3 driver runs in the
orchestrator's main thread alongside the collector, and a session-open
poll of a few hundred milliseconds is well within budget.
"""
from __future__ import annotations
import http.client
import logging
import socket
import ssl
import time
from dataclasses import dataclass
from typing import Any
try:
import msgpack # type: ignore[import-untyped]
except ImportError as e: # pragma: no cover - import-time guard
raise ImportError(
"the msgpack package is required for the MSF RPC client. "
"install it with: pip install msgpack"
) from e
log = logging.getLogger("cis490.msfrpc")
class MSFRpcError(RuntimeError):
"""Raised when msfrpcd returns an error or a malformed response."""
@dataclass
class MSFRpcConfig:
host: str = "127.0.0.1"
port: int = 55553
user: str = "msf"
password: str = ""
ssl: bool = True
timeout_s: float = 30.0
# msfrpcd's default cert is self-signed — most callers will run
# against localhost where this is the right tradeoff. Override
# explicitly for any non-loopback host.
verify: bool = False
class MSFRpcClient:
"""Synchronous msfrpcd client. Token is acquired on ``login()`` and
re-used on every subsequent call. Not thread-safe; the driver owns
one client per episode."""
def __init__(self, cfg: MSFRpcConfig) -> None:
self.cfg = cfg
self._token: str | None = None
# ---- session management --------------------------------------------
def login(self) -> None:
resp = self._call_no_auth("auth.login", self.cfg.user, self.cfg.password)
if resp.get("result") != "success" or "token" not in resp:
raise MSFRpcError(f"auth.login failed: {resp!r}")
self._token = resp["token"]
log.info("msfrpc auth.login ok (token=%s...)", self._token[:8])
def logout(self) -> None:
if self._token is None:
return
try:
self._call("auth.logout", self._token)
except MSFRpcError as e:
log.warning("msfrpc auth.logout: %s", e)
finally:
self._token = None
# ---- modules --------------------------------------------------------
def module_execute(
self,
module_type: str,
module_name: str,
options: dict[str, Any],
) -> dict[str, Any]:
"""Fire a module. Returns ``{"job_id": int, "uuid": str}``."""
resp = self._call("module.execute", module_type, module_name, options)
if "job_id" not in resp:
raise MSFRpcError(f"module.execute returned no job_id: {resp!r}")
log.info(
"module.execute %s/%s -> job_id=%s uuid=%s resp=%r",
module_type, module_name, resp["job_id"], resp.get("uuid"), resp,
)
return resp
# ---- jobs -----------------------------------------------------------
def job_list(self) -> dict[str, str]:
return self._call("job.list")
def job_stop(self, job_id: int | str) -> dict[str, Any]:
# msfrpcd accepts the id as a string.
return self._call("job.stop", str(job_id))
# ---- sessions -------------------------------------------------------
def session_list(self) -> dict[int, dict[str, Any]]:
raw = self._call("session.list")
# msfrpcd keys session ids as ints in msgpack but some versions
# round-trip them as strings. Normalize.
out: dict[int, dict[str, Any]] = {}
for k, v in (raw or {}).items():
try:
out[int(k)] = v
except (TypeError, ValueError):
pass
return out
def session_shell_write(self, session_id: int, data: str) -> dict[str, Any]:
if not data.endswith("\n"):
data = data + "\n"
return self._call("session.shell_write", session_id, data)
def session_shell_read(self, session_id: int) -> str:
resp = self._call("session.shell_read", session_id)
return resp.get("data", "") if isinstance(resp, dict) else ""
def session_stop(self, session_id: int) -> dict[str, Any]:
return self._call("session.stop", session_id)
# ---- transport ------------------------------------------------------
def _call(self, method: str, *args: Any) -> dict[str, Any]:
if self._token is None:
raise MSFRpcError("not authenticated; call login() first")
return self._raw_call([method, self._token, *args])
def _call_no_auth(self, method: str, *args: Any) -> dict[str, Any]:
return self._raw_call([method, *args])
@staticmethod
def _str(v: Any) -> Any:
"""Decode bytes to str; recursively normalize dicts and lists.
msfrpcd (pacman metasploit 6.x) returns msgpack bin type for all
string values, so raw=False still gives bytes. Normalise the whole
response tree so callers can use plain str keys/values.
"""
if isinstance(v, bytes):
return v.decode("utf-8", errors="replace")
if isinstance(v, dict):
return {MSFRpcClient._str(k): MSFRpcClient._str(val) for k, val in v.items()}
if isinstance(v, list):
return [MSFRpcClient._str(i) for i in v]
return v
def _raw_call(self, payload: list[Any]) -> dict[str, Any]:
body = msgpack.packb(payload, use_bin_type=False)
conn = self._open_conn()
try:
conn.request(
"POST",
"/api/",
body=body,
headers={
"Content-Type": "binary/message-pack",
"Content-Length": str(len(body)),
"Connection": "close",
},
)
r = conn.getresponse()
raw = r.read()
if r.status != 200:
raise MSFRpcError(
f"msfrpcd HTTP {r.status} for {payload[0]!r}: {raw[:200]!r}"
)
except (socket.error, http.client.HTTPException) as e:
raise MSFRpcError(f"transport error calling {payload[0]!r}: {e}") from e
finally:
conn.close()
try:
decoded = self._str(msgpack.unpackb(raw, raw=False, strict_map_key=False))
except Exception as e:
raise MSFRpcError(f"could not decode msfrpcd response: {e}") from e
if isinstance(decoded, dict) and decoded.get("error") is True:
raise MSFRpcError(
f"{payload[0]!r}: {decoded.get('error_class')} "
f"{decoded.get('error_message')}"
)
if not isinstance(decoded, dict):
# session.list and friends can legitimately return {} or a dict,
# but never a non-dict — anything else is a protocol violation.
raise MSFRpcError(
f"unexpected response type for {payload[0]!r}: {type(decoded).__name__}"
)
return decoded
def _open_conn(self) -> http.client.HTTPConnection:
if self.cfg.ssl:
ctx = ssl.create_default_context()
if not self.cfg.verify:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return http.client.HTTPSConnection(
self.cfg.host, self.cfg.port,
timeout=self.cfg.timeout_s, context=ctx,
)
return http.client.HTTPConnection(
self.cfg.host, self.cfg.port, timeout=self.cfg.timeout_s,
)
def wait_for_new_session(
client: MSFRpcClient,
*,
seen: set[int],
timeout_s: float,
poll_s: float = 0.25,
) -> tuple[int, dict[str, Any]] | None:
"""Poll ``session.list`` until a session id we haven't seen before
appears, or until timeout. Returns ``(session_id, info)`` or None."""
log = __import__("logging").getLogger("cis490.msfrpc")
deadline = time.monotonic() + timeout_s
logged_empty = False
while time.monotonic() < deadline:
sessions = client.session_list()
if not logged_empty:
log.debug("wait_for_new_session: seen=%r current=%r", seen, list(sessions.keys()))
logged_empty = True
for sid, info in sessions.items():
if sid not in seen:
return sid, info
time.sleep(poll_s)
# Log final state on timeout
log.debug("wait_for_new_session timeout: final sessions=%r", client.session_list())
return None