CIS490/tests/test_exploits.py
Max Gorog dca6144a4a catalog: remove samba_usermap_script — never landed sessions in prod
PIPELINE.md §1 (default-to-removal), §4.3 (catalog admission), §10
(every dishonest label is a poisoned training example).

Empirical evidence on commits 4ab5477c41763b: samba_usermap_script
fired its bind_perl payload but the framework's bind handler never
managed to connect to the guest's listening port within
session_open_timeout_s=30 (or even with WfsDelay=30 bumped on the
framework side). All 67 attempts in the §3 probe ended in
session_open_timeout. Yet the schedule clock was still writing
`infected_running` labels for the failed exploit — exactly the §10
poisoned-example pattern.

Until §5 step 3 builds an in-house target VM and step 4 re-admits
modules with `verified_against` recorded (§4.3), the production
catalog should consist of zero verified Tier-3 modules. That's the
state after this removal: the four remaining modules
(vsftpd_234_backdoor, distccd_command_exec, php_cgi_arg_injection,
unreal_ircd_3281_backdoor) are all `requires_bridge=true`, which the
fleet picker filters out unconditionally (the post-revert behavior
from commit 0390eb2). Net effect: production runs Tier-2 only,
producing honest Tier-2 episodes and zero dishonest Tier-3
infected_running labels.

Test fixture updated to inject synthetic in-memory ModuleConfigs
instead of loading from disk, so Tier-3 dispatch logic stays tested
even though no production module qualifies. test_exploits asserts
the new "every shipped module is requires_bridge until §4.3 admits
something verified" invariant — flips into a tripwire if anyone
reintroduces an unverified non-bridge module.

229 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 22:48:03 -05:00

487 lines
18 KiB
Python

"""Tests for the Tier-3 exploit driver and its module loader.
The msfrpc transport itself is exercised against a fake client so the
suite runs in-process. A live-msfrpcd integration test is out of
scope here — the wire format is small and the high-value coverage is
the phase-to-action mapping plus the events the driver emits.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pytest
from exploits.driver import DriverConfig, MSFExploitDriver
from exploits.modules import ModuleConfig, load_module_config
REPO_ROOT = Path(__file__).resolve().parent.parent
MODULES_DIR = REPO_ROOT / "exploits" / "modules"
# -----------------------------------------------------------------------
# Module config loader
# -----------------------------------------------------------------------
def test_module_catalog_only_contains_unverified_modules() -> None:
"""All currently-shipped Metasploitable2 modules are bridge-only and
none has been re-verified end-to-end since the §3 probe surfaced
that no Tier-3 module reliably lands sessions against the
SourceForge Metasploitable2 image. Per PIPELINE.md §4.3 admission
criteria, the catalog should consist only of verified modules; the
interim correct state is "every shipped module is requires_bridge,
so the picker filters them all out and Tier-3 doesn't run." This
keeps the dataset honest until §5 step 3 builds a target VM and
step 4 re-admits modules with `verified_against` recorded.
Updated 2026-05-04 after removing samba_usermap_script (commit
c41763b empirical evidence: bind_perl handler couldn't connect
after exploit_fire even with WfsDelay=30, producing dishonest
infected_running labels per §10)."""
from exploits.modules import load_module_configs
catalog = load_module_configs(MODULES_DIR)
assert all(m.requires_bridge for m in catalog.values()), (
"every currently-shipped module must be requires_bridge=true so "
"the production picker drops all of them — keeps Tier-3 honest "
"until a verified module is admitted (§4.3). Modules in catalog: "
f"{[(n, m.requires_bridge) for n, m in catalog.items()]}"
)
def test_load_vsftpd_module_config_round_trip() -> None:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
assert cfg.name == "vsftpd_234_backdoor"
assert cfg.module_type == "exploit"
assert cfg.module_path == "unix/ftp/vsftpd_234_backdoor"
assert cfg.options["RPORT"] == 21
assert cfg.options["RHOSTS"] == "{{ target_ip }}"
assert cfg.payload_path == "cmd/unix/interact"
def test_render_options_substitutes_target_ip() -> None:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
rendered = cfg.render_options(target_ip="10.200.0.10")
assert rendered["RHOSTS"] == "10.200.0.10"
assert rendered["RPORT"] == 21
assert rendered["PAYLOAD"] == "cmd/unix/interact"
def test_select_module_is_deterministic() -> None:
from exploits.modules import load_module_configs, select_module
catalog = load_module_configs(MODULES_DIR)
a = select_module(catalog, host_id="lab-7", slot=2, episode_index=11)
b = select_module(catalog, host_id="lab-7", slot=2, episode_index=11)
assert a is b
def test_select_module_diversifies_across_hosts() -> None:
from exploits.modules import load_module_configs, select_module
catalog = load_module_configs(MODULES_DIR)
matches = 0
for slot in range(20):
a = select_module(catalog, host_id="alice", slot=slot, episode_index=0)
b = select_module(catalog, host_id="bob", slot=slot, episode_index=0)
if a is b:
matches += 1
assert matches < 15, "host_id seed isn't producing module variety"
def test_select_module_walks_catalog() -> None:
from exploits.modules import load_module_configs, select_module
catalog = load_module_configs(MODULES_DIR)
seen = set()
for ep in range(200):
seen.add(select_module(catalog, host_id="lab-x", slot=0, episode_index=ep).name)
assert seen == set(catalog.keys()), \
f"only saw {len(seen)}/{len(catalog)} modules across 200 episodes"
def test_module_target_port_pulls_rport() -> None:
from exploits.modules import load_module_configs, module_target_port
catalog = load_module_configs(MODULES_DIR)
assert module_target_port(catalog["vsftpd_234_backdoor"]) == 21
assert module_target_port(catalog["distccd_command_exec"]) == 3632
assert module_target_port(catalog["php_cgi_arg_injection"]) == 80
assert module_target_port(catalog["unreal_ircd_3281_backdoor"]) == 6667
def test_render_options_handles_both_brace_styles(tmp_path: Path) -> None:
p = tmp_path / "x.toml"
p.write_text(
'[module]\n'
'type = "exploit"\n'
'path = "unix/ftp/example"\n'
'[module.options]\n'
'RHOSTS = "{{target_ip}}"\n'
'LHOST = "{{ target_ip }}"\n'
)
cfg = load_module_config(p)
rendered = cfg.render_options(target_ip="10.0.0.5")
assert rendered["RHOSTS"] == "10.0.0.5"
assert rendered["LHOST"] == "10.0.0.5"
def test_load_rejects_missing_module_path(tmp_path: Path) -> None:
p = tmp_path / "bad.toml"
p.write_text('[module]\ntype = "exploit"\n')
with pytest.raises(ValueError, match="module.path"):
load_module_config(p)
def test_load_rejects_unknown_module_type(tmp_path: Path) -> None:
p = tmp_path / "bad.toml"
p.write_text(
'[module]\ntype = "evil"\npath = "unix/ftp/x"\n'
)
with pytest.raises(ValueError, match="module.type"):
load_module_config(p)
# -----------------------------------------------------------------------
# Exploit driver — phase transitions against a fake MSFRpcClient
# -----------------------------------------------------------------------
class FakeMSFRpcClient:
"""Stand-in that records every method called and lets a test
script the apparent state of msfrpcd (sessions, return values)."""
def __init__(self, *, sessions_after_fire: dict[int, dict[str, Any]] | None = None) -> None:
self.calls: list[tuple[str, tuple, dict]] = []
self.logged_in = False
self._fired = False
self._sessions: dict[int, dict[str, Any]] = {}
self._sessions_after_fire = sessions_after_fire or {}
self.shell_writes: list[tuple[int, str]] = []
def _record(self, name: str, *args, **kwargs) -> None:
self.calls.append((name, args, kwargs))
def login(self) -> None:
self._record("login")
self.logged_in = True
def logout(self) -> None:
self._record("logout")
self.logged_in = False
def session_list(self) -> dict[int, dict[str, Any]]:
self._record("session_list")
return dict(self._sessions)
def module_execute(self, mtype: str, mname: str, opts: dict) -> dict:
self._record("module_execute", mtype, mname, opts)
self._fired = True
# Simulate sessions appearing after the exploit fires.
self._sessions = dict(self._sessions_after_fire)
return {"job_id": 7, "uuid": "fake-uuid"}
def job_stop(self, job_id) -> dict:
self._record("job_stop", job_id)
return {"result": "success"}
def session_shell_write(self, sid: int, data: str) -> dict:
self._record("session_shell_write", sid, data)
if not data.endswith("\n"):
data = data + "\n"
self.shell_writes.append((sid, data))
return {"write_count": str(len(data))}
def session_shell_read(self, sid: int) -> str:
self._record("session_shell_read", sid)
return "uid=0(root) gid=0(root)\n"
def session_stop(self, sid: int) -> dict:
self._record("session_stop", sid)
self._sessions.pop(sid, None)
return {"result": "success"}
def _make_driver(
sessions_after_fire: dict[int, dict[str, Any]] | None = None,
target_ip: str = "10.200.0.10",
) -> tuple[MSFExploitDriver, FakeMSFRpcClient, list[tuple[str, dict]]]:
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(sessions_after_fire=sessions_after_fire)
events: list[tuple[str, dict]] = []
def emit(event: str, **extra: Any) -> None:
events.append((event, extra))
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(
target_ip=target_ip,
session_open_timeout_s=0.5, # tests must not block
),
emit_event=emit,
)
return driver, client, events
def test_driver_setup_authenticates_and_snapshots_sessions() -> None:
driver, client, events = _make_driver()
client._sessions = {99: {"type": "shell"}} # pre-existing session
driver.setup()
assert client.logged_in is True
assert driver._sessions_seen_at_arm == {99}
assert events[0][0] == "driver_setup"
assert events[0][1]["module"] == "unix/ftp/vsftpd_234_backdoor"
assert events[0][1]["target_ip"] == "10.200.0.10"
def test_full_phase_walk_emits_expected_event_order() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "10.200.0.10:21"}},
)
driver.setup()
for phase in [
"clean", "armed", "infecting",
"infected_running", "dormant",
"infected_running", "dormant",
"clean",
]:
driver.set_phase(phase)
driver.teardown()
names = [e[0] for e in events]
# Order matters: fire comes before session_open, which comes before
# workload, which comes before kill+logout.
assert names.index("exploit_fire") < names.index("session_open")
assert names.index("session_open") < names.index("session_landing_probe")
assert names.index("session_landing_probe") < names.index("sample_executed")
assert names.count("sample_executed") == 2 # two infected_running phases
assert names.count("session_dormant") == 2
assert "session_killed" in names
# Driver should have asked the FakeClient to fire exactly once.
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
assert len(fire_calls) == 1
_, args, _ = fire_calls[0]
assert args[1] == "unix/ftp/vsftpd_234_backdoor"
assert args[2]["RHOSTS"] == "10.200.0.10"
assert args[2]["PAYLOAD"] == "cmd/unix/interact"
def test_session_open_timeout_emits_timeout_event() -> None:
# No sessions ever appear after fire.
driver, client, events = _make_driver(sessions_after_fire={})
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
names = [e[0] for e in events]
assert "session_open_timeout" in names
assert "session_open" not in names
def test_workload_phases_are_no_op_without_session() -> None:
driver, client, events = _make_driver(sessions_after_fire={})
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting") # times out, no session
driver.set_phase("infected_running")
driver.set_phase("dormant")
# No shell writes should have happened.
assert client.shell_writes == []
def test_arm_is_idempotent() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell"}},
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("armed")
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
assert len(fire_calls) == 1
def test_teardown_kills_session_and_logs_out() -> None:
driver, client, events = _make_driver(
sessions_after_fire={1: {"type": "shell"}},
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
driver.teardown()
assert any(c[0] == "session_stop" for c in client.calls)
assert client.logged_in is False
assert any(e[0] == "session_killed" for e in events)
# -----------------------------------------------------------------------
# Driver wired into a real EpisodeRunner — events land in events.jsonl
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
# Driver v2 — sample-profile-driven workloads
# -----------------------------------------------------------------------
def test_v2_uses_profile_workload_for_cpu_saturate() -> None:
"""When constructed with a Sample, the driver should send the
profile's start_cmd at infected_running rather than the v1
yes-loop. The actual command body is owned by exploits.workloads
and tested there; here we just confirm dispatch."""
from samples.manifest import Sample as _Sample
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}},
)
events: list[tuple[str, dict]] = []
sample = _Sample(
name="xmrig-cryptominer",
family="XMRig",
category="cryptominer",
profile="cpu-saturate",
)
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
emit_event=lambda ev, **kw: events.append((ev, kw)),
sample=sample,
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
driver.set_phase("infected_running")
driver.set_phase("dormant")
driver.teardown()
# The shell command sent at infected_running should be the
# profile's multi-line wrapper — NOT the v1 single-yes line.
starts = [w for (_, w) in client.shell_writes if "yes > /dev/null" in w and "cis490-workload" not in w]
assert starts == [], "v2 driver must not send the v1 yes-loop when a Sample is supplied"
# The driver_setup event records sample + workload metadata.
setup_events = [kw for (e, kw) in events if e == "driver_setup"]
assert setup_events
assert setup_events[0]["sample"] == "xmrig-cryptominer"
assert setup_events[0]["sample_kind"] == "mimic"
assert setup_events[0]["workload_profile"] == "cpu-saturate"
# sample_executed carries the profile name + description.
se = [kw for (e, kw) in events if e == "sample_executed"]
assert se
assert se[0]["profile"] == "cpu-saturate"
assert se[0]["sample"] == "xmrig-cryptominer"
def test_v2_distinct_workloads_per_profile() -> None:
"""Two different profiles must produce *different* shell commands.
This is the property that gives the ML model varied envelopes to
learn from."""
from exploits.workloads import all_profiles, workload_for
from samples.manifest import Sample as _Sample
profiles = all_profiles()
assert len(profiles) >= 4
seen_starts: set[str] = set()
for p in profiles:
s = _Sample(name=f"x-{p}", family="X", category="rat", profile=p)
w = workload_for(s)
assert w is not None
seen_starts.add(w.start_cmd)
# Every profile must have a distinct start_cmd.
assert len(seen_starts) == len(profiles), \
"two profiles produced the same workload — ML diversity is at risk"
def test_v2_unknown_profile_falls_back_to_cpu_saturate() -> None:
from exploits.workloads import workload_for
from samples.manifest import Sample as _Sample
s = _Sample(name="weird", family="X", category="rat", profile="not-a-real-profile")
w = workload_for(s)
assert w is not None
assert w.profile == "cpu-saturate"
def test_v1_path_still_works_when_no_sample() -> None:
"""Ensure backwards compat: a driver constructed without a sample
uses the original yes-loop workload."""
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
emit_event=lambda *a, **kw: None,
)
driver.setup()
driver.set_phase("armed")
driver.set_phase("infecting")
driver.set_phase("infected_running")
driver.teardown()
assert any("yes > /dev/null" in w for (_, w) in client.shell_writes)
def test_driver_events_persist_to_events_jsonl(tmp_path: Path) -> None:
"""When the driver is connected to a real EpisodeRunner, the
events it emits must show up in the episode's events.jsonl with
monotonic-clock timestamps (so labels and exploit events can be
correlated downstream)."""
import os
from orchestrator.episode import EpisodeConfig, EpisodeRunner
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}},
)
schedule = [
("clean", 0.05),
("armed", 0.05),
("infecting", 0.05),
("infected_running", 0.05),
("dormant", 0.05),
("clean", 0.05),
]
ec = EpisodeConfig(
target_pid=os.getpid(),
duration_s=sum(d for _, d in schedule),
interval_ms=20,
data_root=tmp_path,
phase_schedule=schedule,
)
runner = EpisodeRunner(ec)
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
emit_event=runner.emit_event,
)
runner.on_phase = driver.set_phase
driver.setup()
try:
result = runner.run()
finally:
driver.teardown()
events = [
json.loads(l)
for l in (result.episode_dir / "events.jsonl").read_text().splitlines()
]
names = [e["event"] for e in events]
assert "snapshot_load" in names
assert "driver_setup" in names
assert "exploit_fire" in names
assert "session_open" in names
assert "sample_executed" in names
assert "session_dormant" in names
assert "episode_end" in names
# Driver events must carry monotonic timestamps in episode-relative
# order (snapshot_load is essentially at origin, exploit_fire later,
# session_open later still, episode_end last).
by_name = {e["event"]: e for e in events}
assert by_name["snapshot_load"]["t_mono_ns"] < 1_000_000 # <1ms after origin
assert by_name["exploit_fire"]["t_mono_ns"] > by_name["snapshot_load"]["t_mono_ns"]
assert by_name["session_open"]["t_mono_ns"] >= by_name["exploit_fire"]["t_mono_ns"]
assert by_name["episode_end"]["t_mono_ns"] >= by_name["session_open"]["t_mono_ns"]