CIS490/orchestrator/fleet.py
Max Gorog 0390eb20b6 fix: revert speculative fleet picker change — was producing dishonest labels
Empirical evidence from k-gamingcom (commit 4ab5477, 2026-05-03 22:20Z
vsftpd_234_backdoor episode): the picker selected vsftpd because BRIDGE
was set on that host. The exploit fires against target_ip=127.0.0.1
(SLIRP loopback) but vsftpd's hardcoded port-6200 backdoor is reachable
only at the guest's bridge IP. Result: session_open_timeout, AND a
schedule-clock-driven `infected_running` label was still written for
the failed exploit — exactly the §10 poisoned-training-example pattern.

Until guest-IP discovery for bridge mode is wired (a separate piece of
infrastructure), bridge-only modules can't actually reach their target
even when the operator sets BRIDGE for Tier-2's pcap source. Revert
the picker to its prior conservative form: drop requires_bridge modules
unconditionally regardless of BRIDGE state. Same for the BRIDGE env
strip in the Tier-3 launch path — it was correct as unconditional.

Replaces the two aspirational tests
(test_fleet_uses_all_modules_when_bridge_set,
test_fleet_propagates_bridge_env_to_runner) with their honest negatives
(test_tier3_drops_requires_bridge_modules_unconditionally,
test_tier3_strips_bridge_env_even_when_set). The previous tests asserted
behavior the rest of the pipeline can't deliver; they were false signals.

229 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:58:43 -05:00

504 lines
18 KiB
Python

"""Fleet runner — concurrent VM episodes with resource awareness.
The lab host detects its own capacity, picks how many VMs to run in
parallel without driving the box into swap or starving the host
itself, and runs that many episodes simultaneously. Each slot gets a
distinct ``Sample`` from the manifest (deterministically chosen by
host_id + slot index), so every concurrent VM produces novel,
labelable data.
Capacity heuristic — defaults documented inline so they're auditable:
cores_total = os.cpu_count()
cores_reserved = max(1, cores_total // 8) # host + collectors
ram_per_vm_mib = 320 # Alpine fits in 256
# but leave 64 for
# overhead (qemu+ovmf)
ram_headroom_mib = max(1024, ram_total // 8) # never starve host
max_by_cores = cores_total - cores_reserved
max_by_ram = (ram_available - ram_headroom) // ram_per_vm
max_by_load = if (load_1m / cores) > 0.75: tighter cap
The smallest of these wins. The reasoning string is logged + saved
into each episode's meta.json under ``fleet`` so post-hoc analysis
can correlate "this episode was run when 6 VMs were concurrent" with
its observed envelope.
"""
from __future__ import annotations
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from exploits.modules import (
ModuleConfig, load_module_configs, module_target_port, select_module,
)
from samples.manifest import Sample, SampleManifest
log = logging.getLogger("cis490.fleet")
def _msfrpcd_available(host: str = "127.0.0.1", port: int = 55553) -> bool:
"""True when msfrpcd is listening — gate for the Tier-3 default.
A Tier-2 fallback runs when msfrpcd isn't there (still useful
data, just labeled with no-exploit so the trainer can filter)."""
import socket as _sk
try:
with _sk.create_connection((host, port), timeout=0.3):
return True
except OSError:
return False
@dataclass(frozen=True)
class FleetCapacity:
cores_total: int
cores_reserved: int
ram_total_mib: int
ram_available_mib: int
ram_per_vm_mib: int
ram_headroom_mib: int
load_1m: float
max_by_cores: int
max_by_ram: int
max_by_load: int
max_concurrent: int
rationale: str
def to_dict(self) -> dict:
return {
"cores_total": self.cores_total,
"cores_reserved": self.cores_reserved,
"ram_total_mib": self.ram_total_mib,
"ram_available_mib": self.ram_available_mib,
"ram_per_vm_mib": self.ram_per_vm_mib,
"ram_headroom_mib": self.ram_headroom_mib,
"load_1m": self.load_1m,
"max_by_cores": self.max_by_cores,
"max_by_ram": self.max_by_ram,
"max_by_load": self.max_by_load,
"max_concurrent": self.max_concurrent,
"rationale": self.rationale,
}
@dataclass
class FleetConfig:
host_id: str
repo_root: Path
data_root: Path
manifest: SampleManifest
# Module catalog for Tier-3 dispatch. Required for fleet-driven
# exploit-fire variety; empty catalog forces Tier-2 fallback.
modules: dict[str, ModuleConfig] = field(default_factory=dict)
# VM resource shape — must match what the launcher requests.
ram_per_vm_mib: int = 320
# Cap concurrency below the calculated max (e.g. for a smoke test).
max_concurrent_override: int | None = None
# Skip episodes whose sample requires a real binary that's not present.
require_real_samples: bool = False
# Force Tier-2 even when msfrpcd is up; used by tests + dev runs
# that want a no-exploit baseline.
force_tier2: bool = False
# Limit how many slots per wave run as Tier-3. Slots 0..N-1 get
# Tier-3; the rest fall back to Tier-2. Metasploitable2 boot is IO-
# bound: running >~6 concurrent target VMs saturates disk and causes
# all slots to timeout waiting for the guest service to come up.
# None = no cap (all eligible slots use Tier-3).
max_tier3_slots: int | None = None
# msfrpcd connectivity (read by tier-3 driver via env).
msfrpcd_host: str = "127.0.0.1"
msfrpcd_port: int = 55553
def _read_meminfo() -> dict[str, int]:
out: dict[str, int] = {}
try:
with open("/proc/meminfo") as f:
for line in f:
k, _, rest = line.partition(":")
v = rest.strip()
if v.endswith(" kB"):
try:
out[k] = int(v[:-3]) * 1024
except ValueError:
pass
except OSError:
pass
return out
def _read_loadavg() -> float:
try:
with open("/proc/loadavg") as f:
return float(f.read().split()[0])
except (OSError, ValueError, IndexError):
return 0.0
def detect_capacity(*, ram_per_vm_mib: int = 320) -> FleetCapacity:
cores_total = os.cpu_count() or 1
# Reserve at least 1 core, more if the host has many.
cores_reserved = max(1, cores_total // 8)
mem = _read_meminfo()
ram_total_b = mem.get("MemTotal", 0)
ram_avail_b = mem.get("MemAvailable", ram_total_b)
ram_total_mib = ram_total_b // (1024 * 1024)
ram_available_mib = ram_avail_b // (1024 * 1024)
# Never starve the host of more than ~7/8 of its memory.
ram_headroom_mib = max(1024, ram_total_mib // 8)
load_1m = _read_loadavg()
max_by_cores = max(0, cores_total - cores_reserved)
if ram_per_vm_mib <= 0:
max_by_ram = max_by_cores
else:
max_by_ram = max(0, (ram_available_mib - ram_headroom_mib) // ram_per_vm_mib)
# Load-based cap: if the host is already busy, run fewer VMs.
if cores_total and load_1m / cores_total > 0.75:
# Halve, floor 1.
max_by_load = max(1, max_by_cores // 2)
else:
max_by_load = max_by_cores
candidates = [max_by_cores, max_by_ram, max_by_load]
max_concurrent = max(0, min(candidates))
binding = ["cores", "ram", "load"][candidates.index(max_concurrent)] \
if max_concurrent < max_by_cores else "cores"
rationale = (
f"cores_total={cores_total} reserved={cores_reserved} "
f"ram_avail_mib={ram_available_mib} headroom={ram_headroom_mib} "
f"per_vm={ram_per_vm_mib} load_1m={load_1m:.2f} "
f"-> max_concurrent={max_concurrent} (binding={binding})"
)
log.info("capacity: %s", rationale)
return FleetCapacity(
cores_total=cores_total,
cores_reserved=cores_reserved,
ram_total_mib=ram_total_mib,
ram_available_mib=ram_available_mib,
ram_per_vm_mib=ram_per_vm_mib,
ram_headroom_mib=ram_headroom_mib,
load_1m=load_1m,
max_by_cores=max_by_cores,
max_by_ram=max_by_ram,
max_by_load=max_by_load,
max_concurrent=max_concurrent,
rationale=rationale,
)
# ---------------------------------------------------------------------------
# Per-slot episode execution
# ---------------------------------------------------------------------------
@dataclass
class SlotResult:
slot: int
sample_name: str
sample_kind: str
episode_id: str | None
rc: int
duration_s: float
tier: str = "tier2" # "tier3" when an exploit fired
module_name: str | None = None # exploit module identifier (Tier 3 only)
error: str | None = None
extra: dict = field(default_factory=dict)
def _run_slot(
cfg: FleetConfig,
slot: int,
sample: Sample,
episode_index: int,
capacity: FleetCapacity,
) -> SlotResult:
"""Run one episode in a dedicated slot.
Dispatch:
- Tier 3 (default when msfrpcd is listening AND a module catalog
is populated): real exploit fire via run_tier3_demo.py with a
deterministically-selected module + sample.
- Tier 2 (fallback): no exploit; the controller drives a labeled
workload directly via the serial console. Recorded in
SlotResult.tier so trainers can filter the no-exploit episodes.
"""
# Per-slot run dir keeps QEMU sockets + pidfiles isolated. Without
# this, parallel slots rmtree each other's run dir mid-boot.
run_dir_base = "/tmp/cis490-vm-fleet"
# Decide tier.
# Tier-3 modules split into two classes by `requires_bridge`:
# - SLIRP-friendly bind shells like samba_usermap_script's
# cmd/unix/bind_perl (handler connects in over hostfwd).
# - Bridge-only modules (vsftpd's port-6200 backdoor, distccd,
# php_cgi, unreal_ircd) where the handler must reach the
# guest at its own bridge IP.
# The bridge-only set is filtered out unconditionally because the
# rest of the pipeline currently passes target_ip=127.0.0.1 (SLIRP
# loopback) regardless of bridge mode, so bridge-only modules
# land in target_ip mismatches that produce session_open_timeout
# AND a dishonest infected_running label (PIPELINE.md §10). When
# target-IP discovery from the guest's bridge lease lands, this
# filter can be made conditional on `bridge_set` again. See the
# 2026-05-03 vsftpd_234_backdoor episode (commit 4ab5477) on
# k-gamingcom for the empirical evidence the conditional version
# produced poisoned labels.
usable_modules: dict[str, ModuleConfig] = (
{k: v for k, v in cfg.modules.items() if not v.requires_bridge}
if cfg.modules else {}
)
tier3_ready = (
not cfg.force_tier2
and bool(usable_modules)
and _msfrpcd_available(cfg.msfrpcd_host, cfg.msfrpcd_port)
and (cfg.max_tier3_slots is None or slot < cfg.max_tier3_slots)
)
env = os.environ.copy()
env["SLOT"] = str(slot)
env["SAMPLE_NAME"] = sample.name
env["SAMPLE_PROFILE"] = sample.profile
env["SAMPLE_KIND"] = sample.kind
env["FLEET_HOST_ID"] = cfg.host_id
env["FLEET_EPISODE_INDEX"] = str(episode_index)
env["FLEET_MAX_CONCURRENT"] = str(capacity.max_concurrent)
venv_py = cfg.repo_root / ".venv" / "bin" / "python"
py = str(venv_py) if venv_py.exists() else "python3"
log_dir = cfg.data_root / "fleet-logs"
log_dir.mkdir(parents=True, exist_ok=True)
out_log = log_dir / f"slot-{slot}-ep-{episode_index}.log"
if tier3_ready:
module = select_module(
usable_modules,
host_id=cfg.host_id, slot=slot, episode_index=episode_index,
)
guest_port = module_target_port(module) or 21
# HOST_PORT: unprivileged port QEMU hostfwd's to the guest service.
# +2000 shifts all base ports above 1024 (vsftpd:21->2021,
# http:80->2080, smb:139->2139, distcc:3632->5632, irc:6667->8667).
# Slot offset prevents concurrent targets from colliding on loopback.
host_port = guest_port + 2000 + slot * 1000
# Per-slot runner dir for the target VM.
run_dir = f"{run_dir_base}-target-{slot}"
env["RUN_DIR"] = run_dir
env["PORT_BASE"] = str(host_port)
# Main service port pair, plus per-slot bind ports for payloads
# like cmd/unix/bind_perl that open a separate listener in the guest.
# Per-slot offset (base + slot*1000) prevents collisions.
target_ports = f"{host_port}:{guest_port}"
for extra_guest_port in module.extra_target_ports:
# Per-slot LPORT: base + slot*1000. FLEET_PAYLOAD_LPORT overrides
# the payload's LPORT so the guest binds this exact port. The
# hostfwd maps the same number on both sides because the guest's
# bind port equals the per-slot LPORT (not the module's base LPORT).
extra_host_port = extra_guest_port + slot * 1000
target_ports += f",{extra_host_port}:{extra_host_port}"
env["FLEET_PAYLOAD_LPORT"] = str(extra_host_port)
env["TARGET_PORTS"] = target_ports
# Tier-3 always uses SLIRP+hostfwd. Strip BRIDGE so a host that
# has BRIDGE set for Tier-2 (pcap source 4) doesn't accidentally
# propagate it into the Tier-3 launch_target.sh, which would try
# tap mode without the matching guest-IP discovery wired (see
# the usable_modules comment above for the matching reason this
# has to stay strict).
env.pop("BRIDGE", None)
cmd = [
py,
str(cfg.repo_root / "tools" / "run_tier3_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--module", module.name,
"--sample", sample.name,
"--target-port", str(host_port),
"--target-boot-timeout", "300",
]
tier = "tier3"
module_name: str | None = module.name
else:
run_dir = f"{run_dir_base}-{slot}"
env["RUN_DIR"] = run_dir
cmd = [
py,
str(cfg.repo_root / "tools" / "run_real_vm_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--sample", sample.name,
]
tier = "tier2"
module_name = None
if not cfg.force_tier2 and not cfg.modules:
log.warning("slot=%d falling back to Tier 2: empty module catalog", slot)
elif not cfg.force_tier2 and not usable_modules:
log.warning("slot=%d falling back to Tier 2: no non-bridge modules available", slot)
elif not cfg.force_tier2 and cfg.max_tier3_slots is not None and slot >= cfg.max_tier3_slots:
log.debug("slot=%d Tier 2 by max_tier3_slots=%d cap", slot, cfg.max_tier3_slots)
elif not cfg.force_tier2:
log.warning("slot=%d falling back to Tier 2: msfrpcd unreachable at %s:%d",
slot, cfg.msfrpcd_host, cfg.msfrpcd_port)
log.info(
"slot=%d ep=%d tier=%s sample=%s module=%s run_dir=%s",
slot, episode_index, tier, sample.name, module_name, run_dir,
)
started = time.monotonic()
try:
with out_log.open("ab") as logf:
proc = subprocess.run(
cmd,
cwd=str(cfg.repo_root),
env=env,
stdout=logf,
stderr=subprocess.STDOUT,
check=False,
)
rc = proc.returncode
err = None
except (OSError, subprocess.SubprocessError) as e:
rc = -1
err = str(e)
duration = time.monotonic() - started
return SlotResult(
slot=slot,
sample_name=sample.name,
sample_kind=sample.kind,
episode_id=None,
rc=rc,
duration_s=duration,
tier=tier,
module_name=module_name,
error=err,
)
# ---------------------------------------------------------------------------
# FleetRunner
# ---------------------------------------------------------------------------
@dataclass
class FleetRunResult:
capacity: FleetCapacity
slots: list[SlotResult]
total_duration_s: float
class FleetRunner:
def __init__(self, cfg: FleetConfig) -> None:
self.cfg = cfg
self._stop = threading.Event()
def stop(self) -> None:
self._stop.set()
def run(
self,
*,
episodes: int = 1,
episode_index_base: int = 0,
capacity_override: FleetCapacity | None = None,
) -> FleetRunResult:
capacity = capacity_override or detect_capacity(
ram_per_vm_mib=self.cfg.ram_per_vm_mib,
)
n_slots = capacity.max_concurrent
if self.cfg.max_concurrent_override is not None:
n_slots = min(n_slots, self.cfg.max_concurrent_override)
if n_slots <= 0:
log.warning(
"fleet capacity is zero (%s); cannot run", capacity.rationale,
)
return FleetRunResult(
capacity=capacity, slots=[], total_duration_s=0.0,
)
log.info(
"fleet host=%s slots=%d episodes=%d manifest_size=%d",
self.cfg.host_id, n_slots, episodes, len(self.cfg.manifest),
)
all_results: list[SlotResult] = []
t_start = time.monotonic()
for ep in range(episodes):
if self._stop.is_set():
break
episode_index = episode_index_base + ep
slot_samples = [
self.cfg.manifest.select(
host_id=self.cfg.host_id,
slot=slot,
episode_index=episode_index,
)
for slot in range(n_slots)
]
if self.cfg.require_real_samples:
slot_samples = [s for s in slot_samples if s.kind == "real"]
if not slot_samples:
log.warning("require_real_samples: no real samples in manifest; skipping wave")
continue
log.info(
"wave %d/%d: %s",
ep + 1, episodes,
[(i, s.name, s.kind) for i, s in enumerate(slot_samples)],
)
with ThreadPoolExecutor(max_workers=n_slots) as pool:
futures = [
pool.submit(
_run_slot, self.cfg, slot, sample, episode_index, capacity,
)
for slot, sample in enumerate(slot_samples)
]
for fut in as_completed(futures):
res = fut.result()
log.info(
"slot %d sample=%s rc=%d duration=%.1fs",
res.slot, res.sample_name, res.rc, res.duration_s,
)
all_results.append(res)
total = time.monotonic() - t_start
return FleetRunResult(
capacity=capacity,
slots=all_results,
total_duration_s=total,
)
# ---------------------------------------------------------------------------
# Friendly capacity report (used by tools/run_fleet.py --capacity)
# ---------------------------------------------------------------------------
def capacity_report() -> str:
c = detect_capacity()
return (
f"cores: {c.cores_total} (reserve {c.cores_reserved})\n"
f"ram: {c.ram_total_mib} MiB total, {c.ram_available_mib} MiB available "
f"(headroom {c.ram_headroom_mib} MiB, per-vm {c.ram_per_vm_mib} MiB)\n"
f"load: 1m={c.load_1m:.2f}\n"
f"caps: by_cores={c.max_by_cores}, by_ram={c.max_by_ram}, "
f"by_load={c.max_by_load}\n"
f"--> max_concurrent VMs: {c.max_concurrent}\n"
)