CIS490/orchestrator/fleet.py
Max Gorog 207a902c3e PIPELINE §5 step 2: canonical manifest at <repo>/manifest.toml
The experiment is now defined by a single version-pinned file —
manifest.toml at the repo root. PIPELINE.md §4.1 / §13 / §16. Every
lab host loads THIS exact file; per-host overrides of experiment
shape are forbidden.

Drops the following per-host CLI overrides that previously violated
the canonical-manifest principle:
  * --manifest, --modules-dir       (paths now derived)
  * --ram-per-vm-mib                (in manifest.experiment)
  * --max-concurrent                (manifest.experiment.fleet.max_concurrent_ceiling)
  * --max-tier3-slots               (manifest.experiment.fleet.max_tier3_slots)
  * --force-tier2                   (not a §14 sanctioned override knob —
                                     ship empty catalog to disable Tier-3)
  * --require-real-samples          (sample-side concern; out of fleet scope)
  * tools/run_*_demo.py --manifest  (samples path now from canonical)

New surface:
  * manifest.toml                   — the single source of truth
  * orchestrator/manifest.py        — load_canonical() + Manifest dataclass
                                      with strict validation, raises
                                      ManifestError on any failure
  * EpisodeConfig.experiment_meta   — populated by run_*_demo.py from
                                      the canonical manifest; stamped
                                      into every episode's meta.json
                                      under "experiment" key for
                                      provenance
  * cis490-orchestrator.service     — RestartPreventExitStatus=78 so
                                      manifest-load failures stay
                                      stuck-and-loud (§9, §4.7)
  * install-lab-host.sh             — validates manifest.toml at
                                      install time; missing or invalid
                                      = die with clear message

Catalog admission semantics: only modules whose name appears in
manifest.catalog get loaded into the runtime catalog (§4.3 in
miniature, will tighten further in step 4 when verified_against /
last_verified actually gate admission). Missing toml for an admitted
name is a sysadmin error → exit 78.

Renames cfg.manifest → cfg.samples + adds cfg.experiment to
disambiguate sample-manifest from experiment-manifest. Rewrites
test_fleet.py fixture to construct synthetic Manifest objects so
test outcomes don't depend on the on-disk manifest.toml content.

12 new tests in tests/test_manifest.py: schema-version mismatch,
unknown collector, duplicate collector, unknown phase, negative
phase seconds, negative ram, missing catalog fields, json round-trip.

Local run: `python tools/run_fleet.py --capacity` correctly logs the
loaded manifest and prints capacity. 241 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:25:01 -05:00

530 lines
20 KiB
Python

"""Fleet runner — concurrent VM episodes with resource awareness.
The lab host detects its own capacity, picks how many VMs to run in
parallel without driving the box into swap or starving the host
itself, and runs that many episodes simultaneously. Each slot gets a
distinct ``Sample`` from the manifest (deterministically chosen by
host_id + slot index), so every concurrent VM produces novel,
labelable data.
Capacity heuristic — defaults documented inline so they're auditable:
cores_total = os.cpu_count()
cores_reserved = max(1, cores_total // 8) # host + collectors
ram_per_vm_mib = 320 # Alpine fits in 256
# but leave 64 for
# overhead (qemu+ovmf)
ram_headroom_mib = max(1024, ram_total // 8) # never starve host
max_by_cores = cores_total - cores_reserved
max_by_ram = (ram_available - ram_headroom) // ram_per_vm
max_by_load = if (load_1m / cores) > 0.75: tighter cap
The smallest of these wins. The reasoning string is logged + saved
into each episode's meta.json under ``fleet`` so post-hoc analysis
can correlate "this episode was run when 6 VMs were concurrent" with
its observed envelope.
"""
from __future__ import annotations
import logging
import os
import shutil
import signal
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from exploits.modules import (
ModuleConfig, load_module_configs, module_target_port, select_module,
)
from samples.manifest import Sample, SampleManifest
from .manifest import Manifest
log = logging.getLogger("cis490.fleet")
def _msfrpcd_available(host: str = "127.0.0.1", port: int = 55553) -> bool:
"""True when msfrpcd is listening — gate for the Tier-3 default.
A Tier-2 fallback runs when msfrpcd isn't there (still useful
data, just labeled with no-exploit so the trainer can filter)."""
import socket as _sk
try:
with _sk.create_connection((host, port), timeout=0.3):
return True
except OSError:
return False
@dataclass(frozen=True)
class FleetCapacity:
cores_total: int
cores_reserved: int
ram_total_mib: int
ram_available_mib: int
ram_per_vm_mib: int
ram_headroom_mib: int
load_1m: float
max_by_cores: int
max_by_ram: int
max_by_load: int
max_concurrent: int
rationale: str
def to_dict(self) -> dict:
return {
"cores_total": self.cores_total,
"cores_reserved": self.cores_reserved,
"ram_total_mib": self.ram_total_mib,
"ram_available_mib": self.ram_available_mib,
"ram_per_vm_mib": self.ram_per_vm_mib,
"ram_headroom_mib": self.ram_headroom_mib,
"load_1m": self.load_1m,
"max_by_cores": self.max_by_cores,
"max_by_ram": self.max_by_ram,
"max_by_load": self.max_by_load,
"max_concurrent": self.max_concurrent,
"rationale": self.rationale,
}
@dataclass
class FleetConfig:
"""Per-host config that combines (a) the canonical experiment
manifest, (b) per-host identity (host_id), and (c) host-local
paths (data_root, repo_root). Experiment-shape parameters live
in `experiment` (the canonical Manifest). Per-host overrides of
experiment shape are forbidden — see PIPELINE.md §4.1."""
host_id: str
repo_root: Path
data_root: Path
# Canonical experiment manifest — drives schedule, ram_per_vm_mib,
# collector active set, intervals, fleet ceilings, catalog, targets.
experiment: "Manifest"
# Sample manifest (separate file at samples/manifest.toml). Holds
# the per-malware-family sample list; rolls forward independently
# of experiment shape so a new sample is one-line, not a
# canonical-manifest amendment.
samples: SampleManifest
# Module catalog for Tier-3 dispatch. Required for fleet-driven
# exploit-fire variety; empty catalog forces Tier-2 fallback. The
# canonical manifest's catalog admission gates which modules end
# up here (only those whose toml exists AND whose name is in
# experiment.catalog get loaded).
modules: dict[str, ModuleConfig] = field(default_factory=dict)
# msfrpcd connectivity (read by tier-3 driver via env). Per-host
# because msfrpcd may be on a non-loopback bind on some hosts;
# NOT an experiment-shape parameter.
msfrpcd_host: str = "127.0.0.1"
msfrpcd_port: int = 55553
# ---- experiment-shape derived properties (manifest-driven) -----
@property
def ram_per_vm_mib(self) -> int:
return self.experiment.ram_per_vm_mib
@property
def max_concurrent_ceiling(self) -> int:
"""Hard ceiling on per-wave slots; 0 = no ceiling (capacity
detector decides)."""
return self.experiment.fleet.max_concurrent_ceiling
@property
def max_tier3_slots(self) -> int:
"""Cap of Tier-3 slots per wave; 0 = no cap."""
return self.experiment.fleet.max_tier3_slots
def _read_meminfo() -> dict[str, int]:
out: dict[str, int] = {}
try:
with open("/proc/meminfo") as f:
for line in f:
k, _, rest = line.partition(":")
v = rest.strip()
if v.endswith(" kB"):
try:
out[k] = int(v[:-3]) * 1024
except ValueError:
pass
except OSError:
pass
return out
def _read_loadavg() -> float:
try:
with open("/proc/loadavg") as f:
return float(f.read().split()[0])
except (OSError, ValueError, IndexError):
return 0.0
def detect_capacity(*, ram_per_vm_mib: int = 320) -> FleetCapacity:
cores_total = os.cpu_count() or 1
# Reserve at least 1 core, more if the host has many.
cores_reserved = max(1, cores_total // 8)
mem = _read_meminfo()
ram_total_b = mem.get("MemTotal", 0)
ram_avail_b = mem.get("MemAvailable", ram_total_b)
ram_total_mib = ram_total_b // (1024 * 1024)
ram_available_mib = ram_avail_b // (1024 * 1024)
# Never starve the host of more than ~7/8 of its memory.
ram_headroom_mib = max(1024, ram_total_mib // 8)
load_1m = _read_loadavg()
max_by_cores = max(0, cores_total - cores_reserved)
if ram_per_vm_mib <= 0:
max_by_ram = max_by_cores
else:
max_by_ram = max(0, (ram_available_mib - ram_headroom_mib) // ram_per_vm_mib)
# Load-based cap: if the host is already busy, run fewer VMs.
if cores_total and load_1m / cores_total > 0.75:
# Halve, floor 1.
max_by_load = max(1, max_by_cores // 2)
else:
max_by_load = max_by_cores
candidates = [max_by_cores, max_by_ram, max_by_load]
max_concurrent = max(0, min(candidates))
binding = ["cores", "ram", "load"][candidates.index(max_concurrent)] \
if max_concurrent < max_by_cores else "cores"
rationale = (
f"cores_total={cores_total} reserved={cores_reserved} "
f"ram_avail_mib={ram_available_mib} headroom={ram_headroom_mib} "
f"per_vm={ram_per_vm_mib} load_1m={load_1m:.2f} "
f"-> max_concurrent={max_concurrent} (binding={binding})"
)
log.info("capacity: %s", rationale)
return FleetCapacity(
cores_total=cores_total,
cores_reserved=cores_reserved,
ram_total_mib=ram_total_mib,
ram_available_mib=ram_available_mib,
ram_per_vm_mib=ram_per_vm_mib,
ram_headroom_mib=ram_headroom_mib,
load_1m=load_1m,
max_by_cores=max_by_cores,
max_by_ram=max_by_ram,
max_by_load=max_by_load,
max_concurrent=max_concurrent,
rationale=rationale,
)
# ---------------------------------------------------------------------------
# Per-slot episode execution
# ---------------------------------------------------------------------------
@dataclass
class SlotResult:
slot: int
sample_name: str
sample_kind: str
episode_id: str | None
rc: int
duration_s: float
tier: str = "tier2" # "tier3" when an exploit fired
module_name: str | None = None # exploit module identifier (Tier 3 only)
error: str | None = None
extra: dict = field(default_factory=dict)
def _run_slot(
cfg: FleetConfig,
slot: int,
sample: Sample,
episode_index: int,
capacity: FleetCapacity,
) -> SlotResult:
"""Run one episode in a dedicated slot.
Dispatch:
- Tier 3 (default when msfrpcd is listening AND a module catalog
is populated): real exploit fire via run_tier3_demo.py with a
deterministically-selected module + sample.
- Tier 2 (fallback): no exploit; the controller drives a labeled
workload directly via the serial console. Recorded in
SlotResult.tier so trainers can filter the no-exploit episodes.
"""
# Per-slot run dir keeps QEMU sockets + pidfiles isolated. Without
# this, parallel slots rmtree each other's run dir mid-boot.
run_dir_base = "/tmp/cis490-vm-fleet"
# Decide tier.
# Tier-3 modules split into two classes by `requires_bridge`:
# - SLIRP-friendly bind shells like samba_usermap_script's
# cmd/unix/bind_perl (handler connects in over hostfwd).
# - Bridge-only modules (vsftpd's port-6200 backdoor, distccd,
# php_cgi, unreal_ircd) where the handler must reach the
# guest at its own bridge IP.
# The bridge-only set is filtered out unconditionally because the
# rest of the pipeline currently passes target_ip=127.0.0.1 (SLIRP
# loopback) regardless of bridge mode, so bridge-only modules
# land in target_ip mismatches that produce session_open_timeout
# AND a dishonest infected_running label (PIPELINE.md §10). When
# target-IP discovery from the guest's bridge lease lands, this
# filter can be made conditional on `bridge_set` again. See the
# 2026-05-03 vsftpd_234_backdoor episode (commit 4ab5477) on
# k-gamingcom for the empirical evidence the conditional version
# produced poisoned labels.
usable_modules: dict[str, ModuleConfig] = (
{k: v for k, v in cfg.modules.items() if not v.requires_bridge}
if cfg.modules else {}
)
# Tier-3 dispatch requires (a) at least one verified module loaded
# AND not filtered out by requires_bridge, (b) msfrpcd reachable,
# (c) the slot index is below the manifest's max_tier3_slots cap
# (0 = no cap). force_tier2 is no longer a sanctioned override knob
# per §14 — to disable Tier-3 globally, ship an empty catalog (the
# current state); to disable per-slot, raise the cap.
tier3_ready = (
bool(usable_modules)
and _msfrpcd_available(cfg.msfrpcd_host, cfg.msfrpcd_port)
and (cfg.max_tier3_slots == 0 or slot < cfg.max_tier3_slots)
)
env = os.environ.copy()
env["SLOT"] = str(slot)
env["SAMPLE_NAME"] = sample.name
env["SAMPLE_PROFILE"] = sample.profile
env["SAMPLE_KIND"] = sample.kind
env["FLEET_HOST_ID"] = cfg.host_id
env["FLEET_EPISODE_INDEX"] = str(episode_index)
env["FLEET_MAX_CONCURRENT"] = str(capacity.max_concurrent)
venv_py = cfg.repo_root / ".venv" / "bin" / "python"
py = str(venv_py) if venv_py.exists() else "python3"
log_dir = cfg.data_root / "fleet-logs"
log_dir.mkdir(parents=True, exist_ok=True)
out_log = log_dir / f"slot-{slot}-ep-{episode_index}.log"
if tier3_ready:
module = select_module(
usable_modules,
host_id=cfg.host_id, slot=slot, episode_index=episode_index,
)
guest_port = module_target_port(module) or 21
# HOST_PORT: unprivileged port QEMU hostfwd's to the guest service.
# +2000 shifts all base ports above 1024 (vsftpd:21->2021,
# http:80->2080, smb:139->2139, distcc:3632->5632, irc:6667->8667).
# Slot offset prevents concurrent targets from colliding on loopback.
host_port = guest_port + 2000 + slot * 1000
# Per-slot runner dir for the target VM.
run_dir = f"{run_dir_base}-target-{slot}"
env["RUN_DIR"] = run_dir
env["PORT_BASE"] = str(host_port)
# Main service port pair, plus per-slot bind ports for payloads
# like cmd/unix/bind_perl that open a separate listener in the guest.
# Per-slot offset (base + slot*1000) prevents collisions.
target_ports = f"{host_port}:{guest_port}"
for extra_guest_port in module.extra_target_ports:
# Per-slot LPORT: base + slot*1000. FLEET_PAYLOAD_LPORT overrides
# the payload's LPORT so the guest binds this exact port. The
# hostfwd maps the same number on both sides because the guest's
# bind port equals the per-slot LPORT (not the module's base LPORT).
extra_host_port = extra_guest_port + slot * 1000
target_ports += f",{extra_host_port}:{extra_host_port}"
env["FLEET_PAYLOAD_LPORT"] = str(extra_host_port)
env["TARGET_PORTS"] = target_ports
# Tier-3 always uses SLIRP+hostfwd. Strip BRIDGE so a host that
# has BRIDGE set for Tier-2 (pcap source 4) doesn't accidentally
# propagate it into the Tier-3 launch_target.sh, which would try
# tap mode without the matching guest-IP discovery wired (see
# the usable_modules comment above for the matching reason this
# has to stay strict).
env.pop("BRIDGE", None)
cmd = [
py,
str(cfg.repo_root / "tools" / "run_tier3_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--module", module.name,
"--sample", sample.name,
"--target-port", str(host_port),
"--target-boot-timeout", "300",
]
tier = "tier3"
module_name: str | None = module.name
else:
run_dir = f"{run_dir_base}-{slot}"
env["RUN_DIR"] = run_dir
cmd = [
py,
str(cfg.repo_root / "tools" / "run_real_vm_demo.py"),
"--data-root", str(cfg.data_root),
"--run-dir", run_dir,
"--sample", sample.name,
]
tier = "tier2"
module_name = None
if not cfg.modules:
log.warning("slot=%d falling back to Tier 2: empty module catalog", slot)
elif not usable_modules:
log.warning("slot=%d falling back to Tier 2: no non-bridge modules available", slot)
elif cfg.max_tier3_slots > 0 and slot >= cfg.max_tier3_slots:
log.debug("slot=%d Tier 2 by max_tier3_slots=%d cap", slot, cfg.max_tier3_slots)
else:
log.warning("slot=%d falling back to Tier 2: msfrpcd unreachable at %s:%d",
slot, cfg.msfrpcd_host, cfg.msfrpcd_port)
log.info(
"slot=%d ep=%d tier=%s sample=%s module=%s run_dir=%s",
slot, episode_index, tier, sample.name, module_name, run_dir,
)
started = time.monotonic()
try:
with out_log.open("ab") as logf:
proc = subprocess.run(
cmd,
cwd=str(cfg.repo_root),
env=env,
stdout=logf,
stderr=subprocess.STDOUT,
check=False,
)
rc = proc.returncode
err = None
except (OSError, subprocess.SubprocessError) as e:
rc = -1
err = str(e)
duration = time.monotonic() - started
return SlotResult(
slot=slot,
sample_name=sample.name,
sample_kind=sample.kind,
episode_id=None,
rc=rc,
duration_s=duration,
tier=tier,
module_name=module_name,
error=err,
)
# ---------------------------------------------------------------------------
# FleetRunner
# ---------------------------------------------------------------------------
@dataclass
class FleetRunResult:
capacity: FleetCapacity
slots: list[SlotResult]
total_duration_s: float
class FleetRunner:
def __init__(self, cfg: FleetConfig) -> None:
self.cfg = cfg
self._stop = threading.Event()
def stop(self) -> None:
self._stop.set()
def run(
self,
*,
episodes: int = 1,
episode_index_base: int = 0,
capacity_override: FleetCapacity | None = None,
) -> FleetRunResult:
capacity = capacity_override or detect_capacity(
ram_per_vm_mib=self.cfg.ram_per_vm_mib,
)
n_slots = capacity.max_concurrent
# Manifest ceiling: 0 = no ceiling. The capacity detector
# decides per-host based on hardware; the manifest caps that
# decision to keep the dataset balanced across hosts of
# different sizes (PIPELINE.md §4.1).
if self.cfg.max_concurrent_ceiling > 0:
n_slots = min(n_slots, self.cfg.max_concurrent_ceiling)
if n_slots <= 0:
log.warning(
"fleet capacity is zero (%s); cannot run", capacity.rationale,
)
return FleetRunResult(
capacity=capacity, slots=[], total_duration_s=0.0,
)
log.info(
"fleet host=%s slots=%d episodes=%d samples=%d experiment=%s",
self.cfg.host_id, n_slots, episodes, len(self.cfg.samples),
self.cfg.experiment.name,
)
all_results: list[SlotResult] = []
t_start = time.monotonic()
for ep in range(episodes):
if self._stop.is_set():
break
episode_index = episode_index_base + ep
slot_samples = [
self.cfg.samples.select(
host_id=self.cfg.host_id,
slot=slot,
episode_index=episode_index,
)
for slot in range(n_slots)
]
log.info(
"wave %d/%d: %s",
ep + 1, episodes,
[(i, s.name, s.kind) for i, s in enumerate(slot_samples)],
)
with ThreadPoolExecutor(max_workers=n_slots) as pool:
futures = [
pool.submit(
_run_slot, self.cfg, slot, sample, episode_index, capacity,
)
for slot, sample in enumerate(slot_samples)
]
for fut in as_completed(futures):
res = fut.result()
log.info(
"slot %d sample=%s rc=%d duration=%.1fs",
res.slot, res.sample_name, res.rc, res.duration_s,
)
all_results.append(res)
total = time.monotonic() - t_start
return FleetRunResult(
capacity=capacity,
slots=all_results,
total_duration_s=total,
)
# ---------------------------------------------------------------------------
# Friendly capacity report (used by tools/run_fleet.py --capacity)
# ---------------------------------------------------------------------------
def capacity_report(*, ram_per_vm_mib: int = 320) -> str:
c = detect_capacity(ram_per_vm_mib=ram_per_vm_mib)
return (
f"cores: {c.cores_total} (reserve {c.cores_reserved})\n"
f"ram: {c.ram_total_mib} MiB total, {c.ram_available_mib} MiB available "
f"(headroom {c.ram_headroom_mib} MiB, per-vm {c.ram_per_vm_mib} MiB)\n"
f"load: 1m={c.load_1m:.2f}\n"
f"caps: by_cores={c.max_by_cores}, by_ram={c.max_by_ram}, "
f"by_load={c.max_by_load}\n"
f"--> max_concurrent VMs: {c.max_concurrent}\n"
)