"""TOML loader for exploit-module configs. Each ``exploits/modules/*.toml`` describes one Metasploit module — its path, the options to set, the payload to use, and how the driver should treat the resulting session. The driver consumes ``ModuleConfig`` objects; the TOML files are the on-disk source of truth. Why TOML and not msfconsole ``.rc`` scripts? ``.rc`` scripts are imperative and assume an interactive console; the driver needs the *structured* options to push them through msfrpc. TOML is the simplest way to express a small typed map of options — and it round-trips cleanly into ``meta.json`` for episode reproducibility. Per-(host, slot, episode) selection mirrors the sample-manifest selector: we want different vulnerabilities exercised across hosts and waves so the trained model sees a diverse corpus of ``armed → infecting`` transition shapes, not just the same FTP backdoor every run. """ from __future__ import annotations import hashlib import tomllib from dataclasses import dataclass, field from pathlib import Path from typing import Any _VALID_MODULE_TYPES = {"exploit", "auxiliary", "post"} @dataclass(frozen=True) class ModuleConfig: name: str # short id, e.g. "vsftpd_234_backdoor" module_type: str # "exploit" | "auxiliary" | "post" module_path: str # e.g. "unix/ftp/vsftpd_234_backdoor" options: dict[str, Any] = field(default_factory=dict) payload_path: str | None = None # e.g. "cmd/unix/interact" payload_options: dict[str, Any] = field(default_factory=dict) expected_session_type: str = "shell" # what we'll get on success description: str = "" # When true the module's payload uses a callback channel (reverse # or bind shell) and won't land a session under SLIRP+restrict=on. # The fleet runner skips these unless BRIDGE is set so episodes # that fire them actually produce data. requires_bridge: bool = False # Guest ports the fleet must also hostfwd (in addition to RPORT). # Used for bind-shell payloads where the handler connects to a # separate port. Fleet calculates per-slot host ports and sets # FLEET_PAYLOAD_LPORT so the driver can override LPORT at fire time. extra_target_ports: tuple[int, ...] = () def render_options(self, *, target_ip: str) -> dict[str, Any]: """Substitute ``{{ target_ip }}`` placeholders in options. Module configs use Jinja-style placeholders for any value that isn't known until episode time (RHOSTS, LHOST, etc.). Today the only supported placeholder is ``target_ip``; if more are needed later, generalize here.""" out: dict[str, Any] = {} for k, v in self.options.items(): if isinstance(v, str) and "{{" in v: out[k] = ( v.replace("{{ target_ip }}", target_ip) .replace("{{target_ip}}", target_ip) ) else: out[k] = v # MSF requires PAYLOAD as a top-level option even though we # carry it in a separate field on the config. if self.payload_path: out["PAYLOAD"] = self.payload_path for k, v in self.payload_options.items(): if isinstance(v, str) and "{{" in v: v = ( v.replace("{{ target_ip }}", target_ip) .replace("{{target_ip}}", target_ip) ) out[k] = v return out def load_module_config(path: Path) -> ModuleConfig: raw = tomllib.loads(path.read_text()) mod = raw.get("module") or {} module_path = mod.get("path") module_type = mod.get("type", "exploit") if not isinstance(module_path, str) or not module_path: raise ValueError(f"{path}: module.path must be a non-empty string") if module_type not in _VALID_MODULE_TYPES: raise ValueError( f"{path}: module.type {module_type!r} not in {_VALID_MODULE_TYPES}" ) options = (raw.get("module", {}).get("options") or {}) | (raw.get("options") or {}) payload = raw.get("payload") or {} return ModuleConfig( name=path.stem, module_type=module_type, module_path=module_path, options=dict(options), payload_path=payload.get("path"), payload_options=dict(payload.get("options") or {}), expected_session_type=raw.get("session", {}).get("type", "shell"), description=raw.get("description", ""), requires_bridge=bool(raw.get("runtime", {}).get("requires_bridge", False)), extra_target_ports=tuple( int(p) for p in raw.get("runtime", {}).get("extra_target_ports", []) ), ) def load_module_configs(directory: Path) -> dict[str, ModuleConfig]: """Load every ``*.toml`` under ``directory``, keyed by short name.""" return { p.stem: load_module_config(p) for p in sorted(directory.glob("*.toml")) } def select_module( catalog: dict[str, ModuleConfig], *, host_id: str, slot: int, episode_index: int, ) -> ModuleConfig: """Deterministic per-(host, slot, ep) module selector. Mirrors SampleManifest.select() so the entry vector rotates the same way the post-infection workload does. Two hosts hash to different modules at the same slot/episode (collision rate ~1/N); a single host walks the full catalog within ~len(catalog) episodes. Inputs reduce to a SHA-256 keyed lookup so runs replay bit-identically given the same (host, slot, ep) tuple.""" if not catalog: raise ValueError("module catalog is empty") keys = sorted(catalog.keys()) seed = f"module|{host_id}|{slot}|{episode_index}".encode() h = hashlib.sha256(seed).digest() idx = int.from_bytes(h[:8], "big") % len(keys) return catalog[keys[idx]] def module_target_port(module: ModuleConfig) -> int | None: """Pull the RPORT off a module config. Used by the fleet runner to wire the launcher's hostfwd to the right service inside the target VM (vsftpd:21, samba:139, php-cgi:80, distccd:3632, unrealircd:6667).""" rport = module.options.get("RPORT") if isinstance(rport, int): return rport if isinstance(rport, str) and rport.isdigit(): return int(rport) return None