CIS490/orchestrator/fleet.py
elliott 656a015443 fix: fleet tier3 port formula produces privileged ports, boot timeout too tight
Two bugs causing all tier3 slots to fail:

1. PORT_BASE = target_port + slot * 1000 → slot 0 with samba (139)
   and php (80) produced host ports < 1024. cis490 lacks
   CAP_NET_BIND_SERVICE; QEMU's SLIRP hostfwd silently skipped the
   bind, making the service unreachable. Changed to:
   host_port = (target_port % 1000) + 2000 + slot * 1000
   so the minimum is always ≥ 2000 regardless of module RPORT.

2. --target-boot-timeout was never passed to run_tier3_demo.py,
   so it used the 180 s default. 7 concurrent VMs under I/O
   contention need more time; now passes 300 s explicitly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 19:56:19 -06:00

474 lines
16 KiB
Python

"""Fleet runner — concurrent VM episodes with resource awareness.
The lab host detects its own capacity, picks how many VMs to run in
parallel without driving the box into swap or starving the host
itself, and runs that many episodes simultaneously. Each slot gets a
distinct ``Sample`` from the manifest (deterministically chosen by
host_id + slot index), so every concurrent VM produces novel,
labelable data.
Capacity heuristic — defaults documented inline so they're auditable:
cores_total = os.cpu_count()
cores_reserved = max(1, cores_total // 8) # host + collectors
ram_per_vm_mib = 320 # Alpine fits in 256
# but leave 64 for
# overhead (qemu+ovmf)
ram_headroom_mib = max(1024, ram_total // 8) # never starve host
max_by_cores = cores_total - cores_reserved
max_by_ram = (ram_available - ram_headroom) // ram_per_vm
max_by_load = if (load_1m / cores) > 0.75: tighter cap
The smallest of these wins. The reasoning string is logged + saved
into each episode's meta.json under ``fleet`` so post-hoc analysis
can correlate "this episode was run when 6 VMs were concurrent" with
its observed envelope.
"""
from __future__ import annotations
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from exploits.modules import (
ModuleConfig, load_module_configs, module_target_port, select_module,
)
from samples.manifest import Sample, SampleManifest
log = logging.getLogger("cis490.fleet")
def _msfrpcd_available(host: str = "127.0.0.1", port: int = 55553) -> bool:
"""True when msfrpcd is listening — gate for the Tier-3 default.
A Tier-2 fallback runs when msfrpcd isn't there (still useful
data, just labeled with no-exploit so the trainer can filter)."""
import socket as _sk
try:
with _sk.create_connection((host, port), timeout=0.3):
return True
except OSError:
return False
@dataclass(frozen=True)
class FleetCapacity:
cores_total: int
cores_reserved: int
ram_total_mib: int
ram_available_mib: int
ram_per_vm_mib: int
ram_headroom_mib: int
load_1m: float
max_by_cores: int
max_by_ram: int
max_by_load: int
max_concurrent: int
rationale: str
def to_dict(self) -> dict:
return {
"cores_total": self.cores_total,
"cores_reserved": self.cores_reserved,
"ram_total_mib": self.ram_total_mib,
"ram_available_mib": self.ram_available_mib,
"ram_per_vm_mib": self.ram_per_vm_mib,
"ram_headroom_mib": self.ram_headroom_mib,
"load_1m": self.load_1m,
"max_by_cores": self.max_by_cores,
"max_by_ram": self.max_by_ram,
"max_by_load": self.max_by_load,
"max_concurrent": self.max_concurrent,
"rationale": self.rationale,
}
@dataclass
class FleetConfig:
host_id: str
repo_root: Path
data_root: Path
manifest: SampleManifest
# Module catalog for Tier-3 dispatch. Required for fleet-driven
# exploit-fire variety; empty catalog forces Tier-2 fallback.
modules: dict[str, ModuleConfig] = field(default_factory=dict)
# VM resource shape — must match what the launcher requests.
ram_per_vm_mib: int = 320
# Cap concurrency below the calculated max (e.g. for a smoke test).
max_concurrent_override: int | None = None
# Skip episodes whose sample requires a real binary that's not present.
require_real_samples: bool = False
# Force Tier-2 even when msfrpcd is up; used by tests + dev runs
# that want a no-exploit baseline.
force_tier2: bool = False
# msfrpcd connectivity (read by tier-3 driver via env).
msfrpcd_host: str = "127.0.0.1"
msfrpcd_port: int = 55553
def _read_meminfo() -> dict[str, int]:
out: dict[str, int] = {}
try:
with open("/proc/meminfo") as f:
for line in f:
k, _, rest = line.partition(":")
v = rest.strip()
if v.endswith(" kB"):
try:
out[k] = int(v[:-3]) * 1024
except ValueError:
pass
except OSError:
pass
return out
def _read_loadavg() -> float:
try:
with open("/proc/loadavg") as f:
return float(f.read().split()[0])
except (OSError, ValueError, IndexError):
return 0.0
def detect_capacity(*, ram_per_vm_mib: int = 320) -> FleetCapacity:
cores_total = os.cpu_count() or 1
# Reserve at least 1 core, more if the host has many.
cores_reserved = max(1, cores_total // 8)
mem = _read_meminfo()
ram_total_b = mem.get("MemTotal", 0)
ram_avail_b = mem.get("MemAvailable", ram_total_b)
ram_total_mib = ram_total_b // (1024 * 1024)
ram_available_mib = ram_avail_b // (1024 * 1024)
# Never starve the host of more than ~7/8 of its memory.
ram_headroom_mib = max(1024, ram_total_mib // 8)
load_1m = _read_loadavg()
max_by_cores = max(0, cores_total - cores_reserved)
if ram_per_vm_mib <= 0:
max_by_ram = max_by_cores
else:
max_by_ram = max(0, (ram_available_mib - ram_headroom_mib) // ram_per_vm_mib)
# Load-based cap: if the host is already busy, run fewer VMs.
if cores_total and load_1m / cores_total > 0.75:
# Halve, floor 1.
max_by_load = max(1, max_by_cores // 2)
else:
max_by_load = max_by_cores
candidates = [max_by_cores, max_by_ram, max_by_load]
max_concurrent = max(0, min(candidates))
binding = ["cores", "ram", "load"][candidates.index(max_concurrent)] \
if max_concurrent < max_by_cores else "cores"
rationale = (
f"cores_total={cores_total} reserved={cores_reserved} "
f"ram_avail_mib={ram_available_mib} headroom={ram_headroom_mib} "
f"per_vm={ram_per_vm_mib} load_1m={load_1m:.2f} "
f"-> max_concurrent={max_concurrent} (binding={binding})"
)
log.info("capacity: %s", rationale)
return FleetCapacity(
cores_total=cores_total,
cores_reserved=cores_reserved,
ram_total_mib=ram_total_mib,
ram_available_mib=ram_available_mib,
ram_per_vm_mib=ram_per_vm_mib,
ram_headroom_mib=ram_headroom_mib,
load_1m=load_1m,
max_by_cores=max_by_cores,
max_by_ram=max_by_ram,
max_by_load=max_by_load,
max_concurrent=max_concurrent,
rationale=rationale,
)
# ---------------------------------------------------------------------------
# Per-slot episode execution
# ---------------------------------------------------------------------------
@dataclass
class SlotResult:
slot: int
sample_name: str
sample_kind: str
episode_id: str | None
rc: int
duration_s: float
tier: str = "tier2" # "tier3" when an exploit fired
module_name: str | None = None # exploit module identifier (Tier 3 only)
error: str | None = None
extra: dict = field(default_factory=dict)
def _run_slot(
cfg: FleetConfig,
slot: int,
sample: Sample,
episode_index: int,
capacity: FleetCapacity,
) -> SlotResult:
"""Run one episode in a dedicated slot.
Dispatch:
- Tier 3 (default when msfrpcd is listening AND a module catalog
is populated): real exploit fire via run_tier3_demo.py with a
deterministically-selected module + sample.
- Tier 2 (fallback): no exploit; the controller drives a labeled
workload directly via the serial console. Recorded in
SlotResult.tier so trainers can filter the no-exploit episodes.
"""
# Per-slot run dir keeps QEMU sockets + pidfiles isolated. Without
# this, parallel slots rmtree each other's run dir mid-boot.
run_dir_base = "/tmp/cis490-vm-fleet"
# Decide tier.
bridge_iface = os.environ.get("BRIDGE") or None
# Filter the catalog to modules that can actually fire under the
# current launcher mode. Reverse / bind shells require the host-
# only bridge (no SLIRP+restrict=on guest egress), so skip those
# when BRIDGE isn't set; otherwise the exploit fires but the
# session never lands and the episode degenerates to a 30 s
# session_open_timeout.
if cfg.modules:
if bridge_iface:
usable_modules = dict(cfg.modules)
else:
usable_modules = {
k: v for k, v in cfg.modules.items() if not v.requires_bridge
}
else:
usable_modules = {}
tier3_ready = (
not cfg.force_tier2
and bool(usable_modules)
and _msfrpcd_available(cfg.msfrpcd_host, cfg.msfrpcd_port)
)
env = os.environ.copy()
env["SLOT"] = str(slot)
env["SAMPLE_NAME"] = sample.name
env["SAMPLE_PROFILE"] = sample.profile
env["SAMPLE_KIND"] = sample.kind
env["FLEET_HOST_ID"] = cfg.host_id
env["FLEET_EPISODE_INDEX"] = str(episode_index)
env["FLEET_MAX_CONCURRENT"] = str(capacity.max_concurrent)
venv_py = cfg.repo_root / ".venv" / "bin" / "python"
py = str(venv_py) if venv_py.exists() else "python3"
log_dir = cfg.data_root / "fleet-logs"
log_dir.mkdir(parents=True, exist_ok=True)
out_log = log_dir / f"slot-{slot}-ep-{episode_index}.log"
if tier3_ready:
module = select_module(
usable_modules,
host_id=cfg.host_id, slot=slot, episode_index=episode_index,
)
target_port = module_target_port(module) or 21
# Per-slot runner dir for the target VM.
run_dir = f"{run_dir_base}-target-{slot}"
env["RUN_DIR"] = run_dir
# Each slot gets a unique host-side hostfwd port so concurrent
# targets don't collide on the loopback port. Base at 2000+
# (target_port % 1000) so privileged-port modules (samba/139,
# php/80, vsftpd/21) never try to bind a port < 1024 on the
# host — cis490 user lacks CAP_NET_BIND_SERVICE.
host_port = (target_port % 1000) + 2000 + slot * 1000
env["PORT_BASE"] = str(host_port)
if bridge_iface:
env["BRIDGE"] = bridge_iface
cmd = [
py,
str(cfg.repo_root / "tools" / "run_tier3_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--module", module.name,
"--sample", sample.name,
"--target-port", str(host_port),
# Concurrent VMs contend on I/O during boot; 300 s gives
# a full fleet of 7 slots room to start their services.
"--target-boot-timeout", "300",
]
tier = "tier3"
module_name: str | None = module.name
else:
run_dir = f"{run_dir_base}-{slot}"
env["RUN_DIR"] = run_dir
cmd = [
py,
str(cfg.repo_root / "tools" / "run_real_vm_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--sample", sample.name,
]
tier = "tier2"
module_name = None
if not cfg.force_tier2 and not cfg.modules:
log.warning("slot=%d falling back to Tier 2: empty module catalog", slot)
elif not cfg.force_tier2:
log.warning("slot=%d falling back to Tier 2: msfrpcd unreachable at %s:%d",
slot, cfg.msfrpcd_host, cfg.msfrpcd_port)
log.info(
"slot=%d ep=%d tier=%s sample=%s module=%s run_dir=%s",
slot, episode_index, tier, sample.name, module_name, run_dir,
)
started = time.monotonic()
try:
with out_log.open("ab") as logf:
proc = subprocess.run(
cmd,
cwd=str(cfg.repo_root),
env=env,
stdout=logf,
stderr=subprocess.STDOUT,
check=False,
)
rc = proc.returncode
err = None
except (OSError, subprocess.SubprocessError) as e:
rc = -1
err = str(e)
duration = time.monotonic() - started
return SlotResult(
slot=slot,
sample_name=sample.name,
sample_kind=sample.kind,
episode_id=None,
rc=rc,
duration_s=duration,
tier=tier,
module_name=module_name,
error=err,
)
# ---------------------------------------------------------------------------
# FleetRunner
# ---------------------------------------------------------------------------
@dataclass
class FleetRunResult:
capacity: FleetCapacity
slots: list[SlotResult]
total_duration_s: float
class FleetRunner:
def __init__(self, cfg: FleetConfig) -> None:
self.cfg = cfg
self._stop = threading.Event()
def stop(self) -> None:
self._stop.set()
def run(
self,
*,
episodes: int = 1,
episode_index_base: int = 0,
capacity_override: FleetCapacity | None = None,
) -> FleetRunResult:
capacity = capacity_override or detect_capacity(
ram_per_vm_mib=self.cfg.ram_per_vm_mib,
)
n_slots = capacity.max_concurrent
if self.cfg.max_concurrent_override is not None:
n_slots = min(n_slots, self.cfg.max_concurrent_override)
if n_slots <= 0:
log.warning(
"fleet capacity is zero (%s); cannot run", capacity.rationale,
)
return FleetRunResult(
capacity=capacity, slots=[], total_duration_s=0.0,
)
log.info(
"fleet host=%s slots=%d episodes=%d manifest_size=%d",
self.cfg.host_id, n_slots, episodes, len(self.cfg.manifest),
)
all_results: list[SlotResult] = []
t_start = time.monotonic()
for ep in range(episodes):
if self._stop.is_set():
break
episode_index = episode_index_base + ep
slot_samples = [
self.cfg.manifest.select(
host_id=self.cfg.host_id,
slot=slot,
episode_index=episode_index,
)
for slot in range(n_slots)
]
if self.cfg.require_real_samples:
slot_samples = [s for s in slot_samples if s.kind == "real"]
if not slot_samples:
log.warning("require_real_samples: no real samples in manifest; skipping wave")
continue
log.info(
"wave %d/%d: %s",
ep + 1, episodes,
[(i, s.name, s.kind) for i, s in enumerate(slot_samples)],
)
with ThreadPoolExecutor(max_workers=n_slots) as pool:
futures = [
pool.submit(
_run_slot, self.cfg, slot, sample, episode_index, capacity,
)
for slot, sample in enumerate(slot_samples)
]
for fut in as_completed(futures):
res = fut.result()
log.info(
"slot %d sample=%s rc=%d duration=%.1fs",
res.slot, res.sample_name, res.rc, res.duration_s,
)
all_results.append(res)
total = time.monotonic() - t_start
return FleetRunResult(
capacity=capacity,
slots=all_results,
total_duration_s=total,
)
# ---------------------------------------------------------------------------
# Friendly capacity report (used by tools/run_fleet.py --capacity)
# ---------------------------------------------------------------------------
def capacity_report() -> str:
c = detect_capacity()
return (
f"cores: {c.cores_total} (reserve {c.cores_reserved})\n"
f"ram: {c.ram_total_mib} MiB total, {c.ram_available_mib} MiB available "
f"(headroom {c.ram_headroom_mib} MiB, per-vm {c.ram_per_vm_mib} MiB)\n"
f"load: 1m={c.load_1m:.2f}\n"
f"caps: by_cores={c.max_by_cores}, by_ram={c.max_by_ram}, "
f"by_load={c.max_by_load}\n"
f"--> max_concurrent VMs: {c.max_concurrent}\n"
)