Root causes and fixes documented in TIER3-BRINGUP.md. Summary:
1. BRIDGE env var leaked into Tier-3 subprocess → target VM used tap
instead of SLIRP; fix: env.pop("BRIDGE") in fleet _run_slot.
2. usable_modules filter conditioned on BRIDGE presence → bridge-requiring
modules selected on SLIRP runs; fix: always filter requires_bridge.
3. cmd/unix/interact creates no session.list entry → session_open_timeout
every episode; fix: switch samba_usermap_script to cmd/unix/bind_perl.
4. Per-slot LPORT hostfwd used wrong guest port (host:5444→guest:4444);
fix: extra_host_port:extra_host_port mapping so guest binds the
per-slot LPORT directly.
5. vsftpd backdoor port 6200 hardcoded → collision across concurrent slots;
fix: requires_bridge=true filters it from SLIRP fleet runs.
6. SLIRP false-positive in _wait_for_tcp → exploit fires before Samba
boots (~60 s too early); fix: replace TCP probe with serial console
_wait_for_serial_login that waits for actual "login:" prompt.
7. Stale QEMU survives orchestrator restart (start_new_session=True) →
holds hostfwd ports, new QEMU silently fails; fix: kill by pgid from
old pidfile before rmtree.
8. PORT_BASE default used privileged port 21; fix: default to 2021+slot*100.
9. msfrpcd 6.x returns bytes for all string values even with raw=False;
fix: MSFRpcClient._str() recursive decoder applied to all responses.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
489 lines
18 KiB
Python
489 lines
18 KiB
Python
"""Fleet runner — concurrent VM episodes with resource awareness.
|
|
|
|
The lab host detects its own capacity, picks how many VMs to run in
|
|
parallel without driving the box into swap or starving the host
|
|
itself, and runs that many episodes simultaneously. Each slot gets a
|
|
distinct ``Sample`` from the manifest (deterministically chosen by
|
|
host_id + slot index), so every concurrent VM produces novel,
|
|
labelable data.
|
|
|
|
Capacity heuristic — defaults documented inline so they're auditable:
|
|
|
|
cores_total = os.cpu_count()
|
|
cores_reserved = max(1, cores_total // 8) # host + collectors
|
|
ram_per_vm_mib = 320 # Alpine fits in 256
|
|
# but leave 64 for
|
|
# overhead (qemu+ovmf)
|
|
ram_headroom_mib = max(1024, ram_total // 8) # never starve host
|
|
max_by_cores = cores_total - cores_reserved
|
|
max_by_ram = (ram_available - ram_headroom) // ram_per_vm
|
|
max_by_load = if (load_1m / cores) > 0.75: tighter cap
|
|
|
|
The smallest of these wins. The reasoning string is logged + saved
|
|
into each episode's meta.json under ``fleet`` so post-hoc analysis
|
|
can correlate "this episode was run when 6 VMs were concurrent" with
|
|
its observed envelope.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from exploits.modules import (
|
|
ModuleConfig, load_module_configs, module_target_port, select_module,
|
|
)
|
|
from samples.manifest import Sample, SampleManifest
|
|
|
|
|
|
log = logging.getLogger("cis490.fleet")
|
|
|
|
|
|
def _msfrpcd_available(host: str = "127.0.0.1", port: int = 55553) -> bool:
|
|
"""True when msfrpcd is listening — gate for the Tier-3 default.
|
|
A Tier-2 fallback runs when msfrpcd isn't there (still useful
|
|
data, just labeled with no-exploit so the trainer can filter)."""
|
|
import socket as _sk
|
|
try:
|
|
with _sk.create_connection((host, port), timeout=0.3):
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FleetCapacity:
|
|
cores_total: int
|
|
cores_reserved: int
|
|
ram_total_mib: int
|
|
ram_available_mib: int
|
|
ram_per_vm_mib: int
|
|
ram_headroom_mib: int
|
|
load_1m: float
|
|
max_by_cores: int
|
|
max_by_ram: int
|
|
max_by_load: int
|
|
max_concurrent: int
|
|
rationale: str
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"cores_total": self.cores_total,
|
|
"cores_reserved": self.cores_reserved,
|
|
"ram_total_mib": self.ram_total_mib,
|
|
"ram_available_mib": self.ram_available_mib,
|
|
"ram_per_vm_mib": self.ram_per_vm_mib,
|
|
"ram_headroom_mib": self.ram_headroom_mib,
|
|
"load_1m": self.load_1m,
|
|
"max_by_cores": self.max_by_cores,
|
|
"max_by_ram": self.max_by_ram,
|
|
"max_by_load": self.max_by_load,
|
|
"max_concurrent": self.max_concurrent,
|
|
"rationale": self.rationale,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class FleetConfig:
|
|
host_id: str
|
|
repo_root: Path
|
|
data_root: Path
|
|
manifest: SampleManifest
|
|
# Module catalog for Tier-3 dispatch. Required for fleet-driven
|
|
# exploit-fire variety; empty catalog forces Tier-2 fallback.
|
|
modules: dict[str, ModuleConfig] = field(default_factory=dict)
|
|
# VM resource shape — must match what the launcher requests.
|
|
ram_per_vm_mib: int = 320
|
|
# Cap concurrency below the calculated max (e.g. for a smoke test).
|
|
max_concurrent_override: int | None = None
|
|
# Skip episodes whose sample requires a real binary that's not present.
|
|
require_real_samples: bool = False
|
|
# Force Tier-2 even when msfrpcd is up; used by tests + dev runs
|
|
# that want a no-exploit baseline.
|
|
force_tier2: bool = False
|
|
# Limit how many slots per wave run as Tier-3. Slots 0..N-1 get
|
|
# Tier-3; the rest fall back to Tier-2. Metasploitable2 boot is IO-
|
|
# bound: running >~6 concurrent target VMs saturates disk and causes
|
|
# all slots to timeout waiting for the guest service to come up.
|
|
# None = no cap (all eligible slots use Tier-3).
|
|
max_tier3_slots: int | None = None
|
|
# msfrpcd connectivity (read by tier-3 driver via env).
|
|
msfrpcd_host: str = "127.0.0.1"
|
|
msfrpcd_port: int = 55553
|
|
|
|
|
|
def _read_meminfo() -> dict[str, int]:
|
|
out: dict[str, int] = {}
|
|
try:
|
|
with open("/proc/meminfo") as f:
|
|
for line in f:
|
|
k, _, rest = line.partition(":")
|
|
v = rest.strip()
|
|
if v.endswith(" kB"):
|
|
try:
|
|
out[k] = int(v[:-3]) * 1024
|
|
except ValueError:
|
|
pass
|
|
except OSError:
|
|
pass
|
|
return out
|
|
|
|
|
|
def _read_loadavg() -> float:
|
|
try:
|
|
with open("/proc/loadavg") as f:
|
|
return float(f.read().split()[0])
|
|
except (OSError, ValueError, IndexError):
|
|
return 0.0
|
|
|
|
|
|
def detect_capacity(*, ram_per_vm_mib: int = 320) -> FleetCapacity:
|
|
cores_total = os.cpu_count() or 1
|
|
# Reserve at least 1 core, more if the host has many.
|
|
cores_reserved = max(1, cores_total // 8)
|
|
|
|
mem = _read_meminfo()
|
|
ram_total_b = mem.get("MemTotal", 0)
|
|
ram_avail_b = mem.get("MemAvailable", ram_total_b)
|
|
ram_total_mib = ram_total_b // (1024 * 1024)
|
|
ram_available_mib = ram_avail_b // (1024 * 1024)
|
|
# Never starve the host of more than ~7/8 of its memory.
|
|
ram_headroom_mib = max(1024, ram_total_mib // 8)
|
|
|
|
load_1m = _read_loadavg()
|
|
|
|
max_by_cores = max(0, cores_total - cores_reserved)
|
|
if ram_per_vm_mib <= 0:
|
|
max_by_ram = max_by_cores
|
|
else:
|
|
max_by_ram = max(0, (ram_available_mib - ram_headroom_mib) // ram_per_vm_mib)
|
|
|
|
# Load-based cap: if the host is already busy, run fewer VMs.
|
|
if cores_total and load_1m / cores_total > 0.75:
|
|
# Halve, floor 1.
|
|
max_by_load = max(1, max_by_cores // 2)
|
|
else:
|
|
max_by_load = max_by_cores
|
|
|
|
candidates = [max_by_cores, max_by_ram, max_by_load]
|
|
max_concurrent = max(0, min(candidates))
|
|
|
|
binding = ["cores", "ram", "load"][candidates.index(max_concurrent)] \
|
|
if max_concurrent < max_by_cores else "cores"
|
|
rationale = (
|
|
f"cores_total={cores_total} reserved={cores_reserved} "
|
|
f"ram_avail_mib={ram_available_mib} headroom={ram_headroom_mib} "
|
|
f"per_vm={ram_per_vm_mib} load_1m={load_1m:.2f} "
|
|
f"-> max_concurrent={max_concurrent} (binding={binding})"
|
|
)
|
|
log.info("capacity: %s", rationale)
|
|
|
|
return FleetCapacity(
|
|
cores_total=cores_total,
|
|
cores_reserved=cores_reserved,
|
|
ram_total_mib=ram_total_mib,
|
|
ram_available_mib=ram_available_mib,
|
|
ram_per_vm_mib=ram_per_vm_mib,
|
|
ram_headroom_mib=ram_headroom_mib,
|
|
load_1m=load_1m,
|
|
max_by_cores=max_by_cores,
|
|
max_by_ram=max_by_ram,
|
|
max_by_load=max_by_load,
|
|
max_concurrent=max_concurrent,
|
|
rationale=rationale,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-slot episode execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class SlotResult:
|
|
slot: int
|
|
sample_name: str
|
|
sample_kind: str
|
|
episode_id: str | None
|
|
rc: int
|
|
duration_s: float
|
|
tier: str = "tier2" # "tier3" when an exploit fired
|
|
module_name: str | None = None # exploit module identifier (Tier 3 only)
|
|
error: str | None = None
|
|
extra: dict = field(default_factory=dict)
|
|
|
|
|
|
def _run_slot(
|
|
cfg: FleetConfig,
|
|
slot: int,
|
|
sample: Sample,
|
|
episode_index: int,
|
|
capacity: FleetCapacity,
|
|
) -> SlotResult:
|
|
"""Run one episode in a dedicated slot.
|
|
|
|
Dispatch:
|
|
- Tier 3 (default when msfrpcd is listening AND a module catalog
|
|
is populated): real exploit fire via run_tier3_demo.py with a
|
|
deterministically-selected module + sample.
|
|
- Tier 2 (fallback): no exploit; the controller drives a labeled
|
|
workload directly via the serial console. Recorded in
|
|
SlotResult.tier so trainers can filter the no-exploit episodes.
|
|
"""
|
|
# Per-slot run dir keeps QEMU sockets + pidfiles isolated. Without
|
|
# this, parallel slots rmtree each other's run dir mid-boot.
|
|
run_dir_base = "/tmp/cis490-vm-fleet"
|
|
|
|
# Decide tier.
|
|
# Tier-3 target VMs always use SLIRP+hostfwd so msfrpcd can reach
|
|
# the guest via loopback. BRIDGE tap is for the Tier-2 idle VM only
|
|
# (pcap source 4). Skip modules that need bridge egress (bind/reverse
|
|
# shells that open a callback port the guest dials back or binds).
|
|
usable_modules: dict[str, ModuleConfig] = (
|
|
{k: v for k, v in cfg.modules.items() if not v.requires_bridge}
|
|
if cfg.modules else {}
|
|
)
|
|
tier3_ready = (
|
|
not cfg.force_tier2
|
|
and bool(usable_modules)
|
|
and _msfrpcd_available(cfg.msfrpcd_host, cfg.msfrpcd_port)
|
|
and (cfg.max_tier3_slots is None or slot < cfg.max_tier3_slots)
|
|
)
|
|
|
|
env = os.environ.copy()
|
|
env["SLOT"] = str(slot)
|
|
env["SAMPLE_NAME"] = sample.name
|
|
env["SAMPLE_PROFILE"] = sample.profile
|
|
env["SAMPLE_KIND"] = sample.kind
|
|
env["FLEET_HOST_ID"] = cfg.host_id
|
|
env["FLEET_EPISODE_INDEX"] = str(episode_index)
|
|
env["FLEET_MAX_CONCURRENT"] = str(capacity.max_concurrent)
|
|
|
|
venv_py = cfg.repo_root / ".venv" / "bin" / "python"
|
|
py = str(venv_py) if venv_py.exists() else "python3"
|
|
|
|
log_dir = cfg.data_root / "fleet-logs"
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
out_log = log_dir / f"slot-{slot}-ep-{episode_index}.log"
|
|
|
|
if tier3_ready:
|
|
module = select_module(
|
|
usable_modules,
|
|
host_id=cfg.host_id, slot=slot, episode_index=episode_index,
|
|
)
|
|
guest_port = module_target_port(module) or 21
|
|
# HOST_PORT: unprivileged port QEMU hostfwd's to the guest service.
|
|
# +2000 shifts all base ports above 1024 (vsftpd:21->2021,
|
|
# http:80->2080, smb:139->2139, distcc:3632->5632, irc:6667->8667).
|
|
# Slot offset prevents concurrent targets from colliding on loopback.
|
|
host_port = guest_port + 2000 + slot * 1000
|
|
# Per-slot runner dir for the target VM.
|
|
run_dir = f"{run_dir_base}-target-{slot}"
|
|
env["RUN_DIR"] = run_dir
|
|
env["PORT_BASE"] = str(host_port)
|
|
# Main service port pair, plus per-slot bind ports for payloads
|
|
# like cmd/unix/bind_perl that open a separate listener in the guest.
|
|
# Per-slot offset (base + slot*1000) prevents collisions.
|
|
target_ports = f"{host_port}:{guest_port}"
|
|
for extra_guest_port in module.extra_target_ports:
|
|
# Per-slot LPORT: base + slot*1000. FLEET_PAYLOAD_LPORT overrides
|
|
# the payload's LPORT so the guest binds this exact port. The
|
|
# hostfwd maps the same number on both sides because the guest's
|
|
# bind port equals the per-slot LPORT (not the module's base LPORT).
|
|
extra_host_port = extra_guest_port + slot * 1000
|
|
target_ports += f",{extra_host_port}:{extra_host_port}"
|
|
env["FLEET_PAYLOAD_LPORT"] = str(extra_host_port)
|
|
env["TARGET_PORTS"] = target_ports
|
|
# Remove BRIDGE so launch_target.sh uses SLIRP+hostfwd instead of
|
|
# tap. Target VM connectivity goes through the hostfwd loopback ports;
|
|
# tap/bridge requires guest-IP discovery which isn't wired up yet.
|
|
env.pop("BRIDGE", None)
|
|
cmd = [
|
|
py,
|
|
str(cfg.repo_root / "tools" / "run_tier3_demo.py"),
|
|
"--data-root", str(cfg.data_root),
|
|
"--run-dir", run_dir,
|
|
"--module", module.name,
|
|
"--sample", sample.name,
|
|
"--target-port", str(host_port),
|
|
"--target-boot-timeout", "300",
|
|
]
|
|
tier = "tier3"
|
|
module_name: str | None = module.name
|
|
else:
|
|
run_dir = f"{run_dir_base}-{slot}"
|
|
env["RUN_DIR"] = run_dir
|
|
cmd = [
|
|
py,
|
|
str(cfg.repo_root / "tools" / "run_real_vm_demo.py"),
|
|
"--data-root", str(cfg.data_root),
|
|
"--run-dir", run_dir,
|
|
"--sample", sample.name,
|
|
]
|
|
tier = "tier2"
|
|
module_name = None
|
|
if not cfg.force_tier2 and not cfg.modules:
|
|
log.warning("slot=%d falling back to Tier 2: empty module catalog", slot)
|
|
elif not cfg.force_tier2 and not usable_modules:
|
|
log.warning("slot=%d falling back to Tier 2: no non-bridge modules available", slot)
|
|
elif not cfg.force_tier2 and cfg.max_tier3_slots is not None and slot >= cfg.max_tier3_slots:
|
|
log.debug("slot=%d Tier 2 by max_tier3_slots=%d cap", slot, cfg.max_tier3_slots)
|
|
elif not cfg.force_tier2:
|
|
log.warning("slot=%d falling back to Tier 2: msfrpcd unreachable at %s:%d",
|
|
slot, cfg.msfrpcd_host, cfg.msfrpcd_port)
|
|
|
|
log.info(
|
|
"slot=%d ep=%d tier=%s sample=%s module=%s run_dir=%s",
|
|
slot, episode_index, tier, sample.name, module_name, run_dir,
|
|
)
|
|
|
|
started = time.monotonic()
|
|
try:
|
|
with out_log.open("ab") as logf:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
cwd=str(cfg.repo_root),
|
|
env=env,
|
|
stdout=logf,
|
|
stderr=subprocess.STDOUT,
|
|
check=False,
|
|
)
|
|
rc = proc.returncode
|
|
err = None
|
|
except (OSError, subprocess.SubprocessError) as e:
|
|
rc = -1
|
|
err = str(e)
|
|
duration = time.monotonic() - started
|
|
|
|
return SlotResult(
|
|
slot=slot,
|
|
sample_name=sample.name,
|
|
sample_kind=sample.kind,
|
|
episode_id=None,
|
|
rc=rc,
|
|
duration_s=duration,
|
|
tier=tier,
|
|
module_name=module_name,
|
|
error=err,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FleetRunner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class FleetRunResult:
|
|
capacity: FleetCapacity
|
|
slots: list[SlotResult]
|
|
total_duration_s: float
|
|
|
|
|
|
class FleetRunner:
|
|
def __init__(self, cfg: FleetConfig) -> None:
|
|
self.cfg = cfg
|
|
self._stop = threading.Event()
|
|
|
|
def stop(self) -> None:
|
|
self._stop.set()
|
|
|
|
def run(
|
|
self,
|
|
*,
|
|
episodes: int = 1,
|
|
episode_index_base: int = 0,
|
|
capacity_override: FleetCapacity | None = None,
|
|
) -> FleetRunResult:
|
|
capacity = capacity_override or detect_capacity(
|
|
ram_per_vm_mib=self.cfg.ram_per_vm_mib,
|
|
)
|
|
n_slots = capacity.max_concurrent
|
|
if self.cfg.max_concurrent_override is not None:
|
|
n_slots = min(n_slots, self.cfg.max_concurrent_override)
|
|
if n_slots <= 0:
|
|
log.warning(
|
|
"fleet capacity is zero (%s); cannot run", capacity.rationale,
|
|
)
|
|
return FleetRunResult(
|
|
capacity=capacity, slots=[], total_duration_s=0.0,
|
|
)
|
|
|
|
log.info(
|
|
"fleet host=%s slots=%d episodes=%d manifest_size=%d",
|
|
self.cfg.host_id, n_slots, episodes, len(self.cfg.manifest),
|
|
)
|
|
|
|
all_results: list[SlotResult] = []
|
|
t_start = time.monotonic()
|
|
for ep in range(episodes):
|
|
if self._stop.is_set():
|
|
break
|
|
episode_index = episode_index_base + ep
|
|
slot_samples = [
|
|
self.cfg.manifest.select(
|
|
host_id=self.cfg.host_id,
|
|
slot=slot,
|
|
episode_index=episode_index,
|
|
)
|
|
for slot in range(n_slots)
|
|
]
|
|
if self.cfg.require_real_samples:
|
|
slot_samples = [s for s in slot_samples if s.kind == "real"]
|
|
if not slot_samples:
|
|
log.warning("require_real_samples: no real samples in manifest; skipping wave")
|
|
continue
|
|
|
|
log.info(
|
|
"wave %d/%d: %s",
|
|
ep + 1, episodes,
|
|
[(i, s.name, s.kind) for i, s in enumerate(slot_samples)],
|
|
)
|
|
|
|
with ThreadPoolExecutor(max_workers=n_slots) as pool:
|
|
futures = [
|
|
pool.submit(
|
|
_run_slot, self.cfg, slot, sample, episode_index, capacity,
|
|
)
|
|
for slot, sample in enumerate(slot_samples)
|
|
]
|
|
for fut in as_completed(futures):
|
|
res = fut.result()
|
|
log.info(
|
|
"slot %d sample=%s rc=%d duration=%.1fs",
|
|
res.slot, res.sample_name, res.rc, res.duration_s,
|
|
)
|
|
all_results.append(res)
|
|
|
|
total = time.monotonic() - t_start
|
|
return FleetRunResult(
|
|
capacity=capacity,
|
|
slots=all_results,
|
|
total_duration_s=total,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Friendly capacity report (used by tools/run_fleet.py --capacity)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def capacity_report() -> str:
|
|
c = detect_capacity()
|
|
return (
|
|
f"cores: {c.cores_total} (reserve {c.cores_reserved})\n"
|
|
f"ram: {c.ram_total_mib} MiB total, {c.ram_available_mib} MiB available "
|
|
f"(headroom {c.ram_headroom_mib} MiB, per-vm {c.ram_per_vm_mib} MiB)\n"
|
|
f"load: 1m={c.load_1m:.2f}\n"
|
|
f"caps: by_cores={c.max_by_cores}, by_ram={c.max_by_ram}, "
|
|
f"by_load={c.max_by_load}\n"
|
|
f"--> max_concurrent VMs: {c.max_concurrent}\n"
|
|
)
|