PIPELINE.md §1 (default-to-removal), §4.3 (catalog admission), §10 (every dishonest label is a poisoned training example). Empirical evidence on commits4ab5477→c41763b: samba_usermap_script fired its bind_perl payload but the framework's bind handler never managed to connect to the guest's listening port within session_open_timeout_s=30 (or even with WfsDelay=30 bumped on the framework side). All 67 attempts in the §3 probe ended in session_open_timeout. Yet the schedule clock was still writing `infected_running` labels for the failed exploit — exactly the §10 poisoned-example pattern. Until §5 step 3 builds an in-house target VM and step 4 re-admits modules with `verified_against` recorded (§4.3), the production catalog should consist of zero verified Tier-3 modules. That's the state after this removal: the four remaining modules (vsftpd_234_backdoor, distccd_command_exec, php_cgi_arg_injection, unreal_ircd_3281_backdoor) are all `requires_bridge=true`, which the fleet picker filters out unconditionally (the post-revert behavior from commit0390eb2). Net effect: production runs Tier-2 only, producing honest Tier-2 episodes and zero dishonest Tier-3 infected_running labels. Test fixture updated to inject synthetic in-memory ModuleConfigs instead of loading from disk, so Tier-3 dispatch logic stays tested even though no production module qualifies. test_exploits asserts the new "every shipped module is requires_bridge until §4.3 admits something verified" invariant — flips into a tripwire if anyone reintroduces an unverified non-bridge module. 229 passed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
487 lines
18 KiB
Python
487 lines
18 KiB
Python
"""Tests for the Tier-3 exploit driver and its module loader.
|
|
|
|
The msfrpc transport itself is exercised against a fake client so the
|
|
suite runs in-process. A live-msfrpcd integration test is out of
|
|
scope here — the wire format is small and the high-value coverage is
|
|
the phase-to-action mapping plus the events the driver emits.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from exploits.driver import DriverConfig, MSFExploitDriver
|
|
from exploits.modules import ModuleConfig, load_module_config
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
MODULES_DIR = REPO_ROOT / "exploits" / "modules"
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Module config loader
|
|
# -----------------------------------------------------------------------
|
|
|
|
def test_module_catalog_only_contains_unverified_modules() -> None:
|
|
"""All currently-shipped Metasploitable2 modules are bridge-only and
|
|
none has been re-verified end-to-end since the §3 probe surfaced
|
|
that no Tier-3 module reliably lands sessions against the
|
|
SourceForge Metasploitable2 image. Per PIPELINE.md §4.3 admission
|
|
criteria, the catalog should consist only of verified modules; the
|
|
interim correct state is "every shipped module is requires_bridge,
|
|
so the picker filters them all out and Tier-3 doesn't run." This
|
|
keeps the dataset honest until §5 step 3 builds a target VM and
|
|
step 4 re-admits modules with `verified_against` recorded.
|
|
|
|
Updated 2026-05-04 after removing samba_usermap_script (commit
|
|
c41763b empirical evidence: bind_perl handler couldn't connect
|
|
after exploit_fire even with WfsDelay=30, producing dishonest
|
|
infected_running labels per §10)."""
|
|
from exploits.modules import load_module_configs
|
|
catalog = load_module_configs(MODULES_DIR)
|
|
assert all(m.requires_bridge for m in catalog.values()), (
|
|
"every currently-shipped module must be requires_bridge=true so "
|
|
"the production picker drops all of them — keeps Tier-3 honest "
|
|
"until a verified module is admitted (§4.3). Modules in catalog: "
|
|
f"{[(n, m.requires_bridge) for n, m in catalog.items()]}"
|
|
)
|
|
|
|
|
|
def test_load_vsftpd_module_config_round_trip() -> None:
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
assert cfg.name == "vsftpd_234_backdoor"
|
|
assert cfg.module_type == "exploit"
|
|
assert cfg.module_path == "unix/ftp/vsftpd_234_backdoor"
|
|
assert cfg.options["RPORT"] == 21
|
|
assert cfg.options["RHOSTS"] == "{{ target_ip }}"
|
|
assert cfg.payload_path == "cmd/unix/interact"
|
|
|
|
|
|
def test_render_options_substitutes_target_ip() -> None:
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
rendered = cfg.render_options(target_ip="10.200.0.10")
|
|
assert rendered["RHOSTS"] == "10.200.0.10"
|
|
assert rendered["RPORT"] == 21
|
|
assert rendered["PAYLOAD"] == "cmd/unix/interact"
|
|
|
|
|
|
def test_select_module_is_deterministic() -> None:
|
|
from exploits.modules import load_module_configs, select_module
|
|
catalog = load_module_configs(MODULES_DIR)
|
|
a = select_module(catalog, host_id="lab-7", slot=2, episode_index=11)
|
|
b = select_module(catalog, host_id="lab-7", slot=2, episode_index=11)
|
|
assert a is b
|
|
|
|
|
|
def test_select_module_diversifies_across_hosts() -> None:
|
|
from exploits.modules import load_module_configs, select_module
|
|
catalog = load_module_configs(MODULES_DIR)
|
|
matches = 0
|
|
for slot in range(20):
|
|
a = select_module(catalog, host_id="alice", slot=slot, episode_index=0)
|
|
b = select_module(catalog, host_id="bob", slot=slot, episode_index=0)
|
|
if a is b:
|
|
matches += 1
|
|
assert matches < 15, "host_id seed isn't producing module variety"
|
|
|
|
|
|
def test_select_module_walks_catalog() -> None:
|
|
from exploits.modules import load_module_configs, select_module
|
|
catalog = load_module_configs(MODULES_DIR)
|
|
seen = set()
|
|
for ep in range(200):
|
|
seen.add(select_module(catalog, host_id="lab-x", slot=0, episode_index=ep).name)
|
|
assert seen == set(catalog.keys()), \
|
|
f"only saw {len(seen)}/{len(catalog)} modules across 200 episodes"
|
|
|
|
|
|
def test_module_target_port_pulls_rport() -> None:
|
|
from exploits.modules import load_module_configs, module_target_port
|
|
catalog = load_module_configs(MODULES_DIR)
|
|
assert module_target_port(catalog["vsftpd_234_backdoor"]) == 21
|
|
assert module_target_port(catalog["distccd_command_exec"]) == 3632
|
|
assert module_target_port(catalog["php_cgi_arg_injection"]) == 80
|
|
assert module_target_port(catalog["unreal_ircd_3281_backdoor"]) == 6667
|
|
|
|
|
|
def test_render_options_handles_both_brace_styles(tmp_path: Path) -> None:
|
|
p = tmp_path / "x.toml"
|
|
p.write_text(
|
|
'[module]\n'
|
|
'type = "exploit"\n'
|
|
'path = "unix/ftp/example"\n'
|
|
'[module.options]\n'
|
|
'RHOSTS = "{{target_ip}}"\n'
|
|
'LHOST = "{{ target_ip }}"\n'
|
|
)
|
|
cfg = load_module_config(p)
|
|
rendered = cfg.render_options(target_ip="10.0.0.5")
|
|
assert rendered["RHOSTS"] == "10.0.0.5"
|
|
assert rendered["LHOST"] == "10.0.0.5"
|
|
|
|
|
|
def test_load_rejects_missing_module_path(tmp_path: Path) -> None:
|
|
p = tmp_path / "bad.toml"
|
|
p.write_text('[module]\ntype = "exploit"\n')
|
|
with pytest.raises(ValueError, match="module.path"):
|
|
load_module_config(p)
|
|
|
|
|
|
def test_load_rejects_unknown_module_type(tmp_path: Path) -> None:
|
|
p = tmp_path / "bad.toml"
|
|
p.write_text(
|
|
'[module]\ntype = "evil"\npath = "unix/ftp/x"\n'
|
|
)
|
|
with pytest.raises(ValueError, match="module.type"):
|
|
load_module_config(p)
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Exploit driver — phase transitions against a fake MSFRpcClient
|
|
# -----------------------------------------------------------------------
|
|
|
|
class FakeMSFRpcClient:
|
|
"""Stand-in that records every method called and lets a test
|
|
script the apparent state of msfrpcd (sessions, return values)."""
|
|
|
|
def __init__(self, *, sessions_after_fire: dict[int, dict[str, Any]] | None = None) -> None:
|
|
self.calls: list[tuple[str, tuple, dict]] = []
|
|
self.logged_in = False
|
|
self._fired = False
|
|
self._sessions: dict[int, dict[str, Any]] = {}
|
|
self._sessions_after_fire = sessions_after_fire or {}
|
|
self.shell_writes: list[tuple[int, str]] = []
|
|
|
|
def _record(self, name: str, *args, **kwargs) -> None:
|
|
self.calls.append((name, args, kwargs))
|
|
|
|
def login(self) -> None:
|
|
self._record("login")
|
|
self.logged_in = True
|
|
|
|
def logout(self) -> None:
|
|
self._record("logout")
|
|
self.logged_in = False
|
|
|
|
def session_list(self) -> dict[int, dict[str, Any]]:
|
|
self._record("session_list")
|
|
return dict(self._sessions)
|
|
|
|
def module_execute(self, mtype: str, mname: str, opts: dict) -> dict:
|
|
self._record("module_execute", mtype, mname, opts)
|
|
self._fired = True
|
|
# Simulate sessions appearing after the exploit fires.
|
|
self._sessions = dict(self._sessions_after_fire)
|
|
return {"job_id": 7, "uuid": "fake-uuid"}
|
|
|
|
def job_stop(self, job_id) -> dict:
|
|
self._record("job_stop", job_id)
|
|
return {"result": "success"}
|
|
|
|
def session_shell_write(self, sid: int, data: str) -> dict:
|
|
self._record("session_shell_write", sid, data)
|
|
if not data.endswith("\n"):
|
|
data = data + "\n"
|
|
self.shell_writes.append((sid, data))
|
|
return {"write_count": str(len(data))}
|
|
|
|
def session_shell_read(self, sid: int) -> str:
|
|
self._record("session_shell_read", sid)
|
|
return "uid=0(root) gid=0(root)\n"
|
|
|
|
def session_stop(self, sid: int) -> dict:
|
|
self._record("session_stop", sid)
|
|
self._sessions.pop(sid, None)
|
|
return {"result": "success"}
|
|
|
|
|
|
def _make_driver(
|
|
sessions_after_fire: dict[int, dict[str, Any]] | None = None,
|
|
target_ip: str = "10.200.0.10",
|
|
) -> tuple[MSFExploitDriver, FakeMSFRpcClient, list[tuple[str, dict]]]:
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(sessions_after_fire=sessions_after_fire)
|
|
events: list[tuple[str, dict]] = []
|
|
|
|
def emit(event: str, **extra: Any) -> None:
|
|
events.append((event, extra))
|
|
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(
|
|
target_ip=target_ip,
|
|
session_open_timeout_s=0.5, # tests must not block
|
|
),
|
|
emit_event=emit,
|
|
)
|
|
return driver, client, events
|
|
|
|
|
|
def test_driver_setup_authenticates_and_snapshots_sessions() -> None:
|
|
driver, client, events = _make_driver()
|
|
client._sessions = {99: {"type": "shell"}} # pre-existing session
|
|
driver.setup()
|
|
assert client.logged_in is True
|
|
assert driver._sessions_seen_at_arm == {99}
|
|
assert events[0][0] == "driver_setup"
|
|
assert events[0][1]["module"] == "unix/ftp/vsftpd_234_backdoor"
|
|
assert events[0][1]["target_ip"] == "10.200.0.10"
|
|
|
|
|
|
def test_full_phase_walk_emits_expected_event_order() -> None:
|
|
driver, client, events = _make_driver(
|
|
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "10.200.0.10:21"}},
|
|
)
|
|
driver.setup()
|
|
for phase in [
|
|
"clean", "armed", "infecting",
|
|
"infected_running", "dormant",
|
|
"infected_running", "dormant",
|
|
"clean",
|
|
]:
|
|
driver.set_phase(phase)
|
|
driver.teardown()
|
|
|
|
names = [e[0] for e in events]
|
|
# Order matters: fire comes before session_open, which comes before
|
|
# workload, which comes before kill+logout.
|
|
assert names.index("exploit_fire") < names.index("session_open")
|
|
assert names.index("session_open") < names.index("session_landing_probe")
|
|
assert names.index("session_landing_probe") < names.index("sample_executed")
|
|
assert names.count("sample_executed") == 2 # two infected_running phases
|
|
assert names.count("session_dormant") == 2
|
|
assert "session_killed" in names
|
|
|
|
# Driver should have asked the FakeClient to fire exactly once.
|
|
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
|
|
assert len(fire_calls) == 1
|
|
_, args, _ = fire_calls[0]
|
|
assert args[1] == "unix/ftp/vsftpd_234_backdoor"
|
|
assert args[2]["RHOSTS"] == "10.200.0.10"
|
|
assert args[2]["PAYLOAD"] == "cmd/unix/interact"
|
|
|
|
|
|
def test_session_open_timeout_emits_timeout_event() -> None:
|
|
# No sessions ever appear after fire.
|
|
driver, client, events = _make_driver(sessions_after_fire={})
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting")
|
|
names = [e[0] for e in events]
|
|
assert "session_open_timeout" in names
|
|
assert "session_open" not in names
|
|
|
|
|
|
def test_workload_phases_are_no_op_without_session() -> None:
|
|
driver, client, events = _make_driver(sessions_after_fire={})
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting") # times out, no session
|
|
driver.set_phase("infected_running")
|
|
driver.set_phase("dormant")
|
|
# No shell writes should have happened.
|
|
assert client.shell_writes == []
|
|
|
|
|
|
def test_arm_is_idempotent() -> None:
|
|
driver, client, events = _make_driver(
|
|
sessions_after_fire={1: {"type": "shell"}},
|
|
)
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("armed")
|
|
fire_calls = [c for c in client.calls if c[0] == "module_execute"]
|
|
assert len(fire_calls) == 1
|
|
|
|
|
|
def test_teardown_kills_session_and_logs_out() -> None:
|
|
driver, client, events = _make_driver(
|
|
sessions_after_fire={1: {"type": "shell"}},
|
|
)
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting")
|
|
driver.teardown()
|
|
assert any(c[0] == "session_stop" for c in client.calls)
|
|
assert client.logged_in is False
|
|
assert any(e[0] == "session_killed" for e in events)
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Driver wired into a real EpisodeRunner — events land in events.jsonl
|
|
# -----------------------------------------------------------------------
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Driver v2 — sample-profile-driven workloads
|
|
# -----------------------------------------------------------------------
|
|
|
|
def test_v2_uses_profile_workload_for_cpu_saturate() -> None:
|
|
"""When constructed with a Sample, the driver should send the
|
|
profile's start_cmd at infected_running rather than the v1
|
|
yes-loop. The actual command body is owned by exploits.workloads
|
|
and tested there; here we just confirm dispatch."""
|
|
from samples.manifest import Sample as _Sample
|
|
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(
|
|
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}},
|
|
)
|
|
events: list[tuple[str, dict]] = []
|
|
sample = _Sample(
|
|
name="xmrig-cryptominer",
|
|
family="XMRig",
|
|
category="cryptominer",
|
|
profile="cpu-saturate",
|
|
)
|
|
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
|
|
emit_event=lambda ev, **kw: events.append((ev, kw)),
|
|
sample=sample,
|
|
)
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting")
|
|
driver.set_phase("infected_running")
|
|
driver.set_phase("dormant")
|
|
driver.teardown()
|
|
|
|
# The shell command sent at infected_running should be the
|
|
# profile's multi-line wrapper — NOT the v1 single-yes line.
|
|
starts = [w for (_, w) in client.shell_writes if "yes > /dev/null" in w and "cis490-workload" not in w]
|
|
assert starts == [], "v2 driver must not send the v1 yes-loop when a Sample is supplied"
|
|
|
|
# The driver_setup event records sample + workload metadata.
|
|
setup_events = [kw for (e, kw) in events if e == "driver_setup"]
|
|
assert setup_events
|
|
assert setup_events[0]["sample"] == "xmrig-cryptominer"
|
|
assert setup_events[0]["sample_kind"] == "mimic"
|
|
assert setup_events[0]["workload_profile"] == "cpu-saturate"
|
|
|
|
# sample_executed carries the profile name + description.
|
|
se = [kw for (e, kw) in events if e == "sample_executed"]
|
|
assert se
|
|
assert se[0]["profile"] == "cpu-saturate"
|
|
assert se[0]["sample"] == "xmrig-cryptominer"
|
|
|
|
|
|
def test_v2_distinct_workloads_per_profile() -> None:
|
|
"""Two different profiles must produce *different* shell commands.
|
|
This is the property that gives the ML model varied envelopes to
|
|
learn from."""
|
|
from exploits.workloads import all_profiles, workload_for
|
|
from samples.manifest import Sample as _Sample
|
|
|
|
profiles = all_profiles()
|
|
assert len(profiles) >= 4
|
|
seen_starts: set[str] = set()
|
|
for p in profiles:
|
|
s = _Sample(name=f"x-{p}", family="X", category="rat", profile=p)
|
|
w = workload_for(s)
|
|
assert w is not None
|
|
seen_starts.add(w.start_cmd)
|
|
# Every profile must have a distinct start_cmd.
|
|
assert len(seen_starts) == len(profiles), \
|
|
"two profiles produced the same workload — ML diversity is at risk"
|
|
|
|
|
|
def test_v2_unknown_profile_falls_back_to_cpu_saturate() -> None:
|
|
from exploits.workloads import workload_for
|
|
from samples.manifest import Sample as _Sample
|
|
|
|
s = _Sample(name="weird", family="X", category="rat", profile="not-a-real-profile")
|
|
w = workload_for(s)
|
|
assert w is not None
|
|
assert w.profile == "cpu-saturate"
|
|
|
|
|
|
def test_v1_path_still_works_when_no_sample() -> None:
|
|
"""Ensure backwards compat: a driver constructed without a sample
|
|
uses the original yes-loop workload."""
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
|
|
emit_event=lambda *a, **kw: None,
|
|
)
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting")
|
|
driver.set_phase("infected_running")
|
|
driver.teardown()
|
|
assert any("yes > /dev/null" in w for (_, w) in client.shell_writes)
|
|
|
|
|
|
def test_driver_events_persist_to_events_jsonl(tmp_path: Path) -> None:
|
|
"""When the driver is connected to a real EpisodeRunner, the
|
|
events it emits must show up in the episode's events.jsonl with
|
|
monotonic-clock timestamps (so labels and exploit events can be
|
|
correlated downstream)."""
|
|
import os
|
|
|
|
from orchestrator.episode import EpisodeConfig, EpisodeRunner
|
|
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(
|
|
sessions_after_fire={1: {"type": "shell", "tunnel_peer": "x:21"}},
|
|
)
|
|
|
|
schedule = [
|
|
("clean", 0.05),
|
|
("armed", 0.05),
|
|
("infecting", 0.05),
|
|
("infected_running", 0.05),
|
|
("dormant", 0.05),
|
|
("clean", 0.05),
|
|
]
|
|
ec = EpisodeConfig(
|
|
target_pid=os.getpid(),
|
|
duration_s=sum(d for _, d in schedule),
|
|
interval_ms=20,
|
|
data_root=tmp_path,
|
|
phase_schedule=schedule,
|
|
)
|
|
runner = EpisodeRunner(ec)
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(target_ip="10.200.0.10", session_open_timeout_s=0.5),
|
|
emit_event=runner.emit_event,
|
|
)
|
|
runner.on_phase = driver.set_phase
|
|
driver.setup()
|
|
try:
|
|
result = runner.run()
|
|
finally:
|
|
driver.teardown()
|
|
|
|
events = [
|
|
json.loads(l)
|
|
for l in (result.episode_dir / "events.jsonl").read_text().splitlines()
|
|
]
|
|
names = [e["event"] for e in events]
|
|
assert "snapshot_load" in names
|
|
assert "driver_setup" in names
|
|
assert "exploit_fire" in names
|
|
assert "session_open" in names
|
|
assert "sample_executed" in names
|
|
assert "session_dormant" in names
|
|
assert "episode_end" in names
|
|
|
|
# Driver events must carry monotonic timestamps in episode-relative
|
|
# order (snapshot_load is essentially at origin, exploit_fire later,
|
|
# session_open later still, episode_end last).
|
|
by_name = {e["event"]: e for e in events}
|
|
assert by_name["snapshot_load"]["t_mono_ns"] < 1_000_000 # <1ms after origin
|
|
assert by_name["exploit_fire"]["t_mono_ns"] > by_name["snapshot_load"]["t_mono_ns"]
|
|
assert by_name["session_open"]["t_mono_ns"] >= by_name["exploit_fire"]["t_mono_ns"]
|
|
assert by_name["episode_end"]["t_mono_ns"] >= by_name["session_open"]["t_mono_ns"]
|