"""``cis490-fleet`` — run as many concurrent labeled episodes as the host can handle, drawing samples and experiment shape from the canonical manifest at /manifest.toml. Per PIPELINE.md §4.1 the experiment manifest is the single source of truth for what the experiment is. There are NO `--manifest`, `--modules-dir`, `--ram-per-vm-mib`, `--max-concurrent`, `--max-tier3-slots`, `--force-tier2`, or `--require-real-samples` flags — those would all be per-host overrides of the canonical experiment shape, which §4.1 forbids. A host that can't load the canonical manifest exits 78 (§9, §4.7). Modes: --capacity Print the resource calculation and exit. No VMs spawned. --waves N Run N waves of episodes (one wave = max_concurrent episodes, each in its own slot). Default: 1. """ from __future__ import annotations import argparse import json import logging import os import signal import sys from pathlib import Path # Allow running as a script. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from exploits.modules import load_module_configs # noqa: E402 from orchestrator.fleet import ( # noqa: E402 FleetConfig, FleetRunner, capacity_report, detect_capacity, ) from orchestrator.manifest import ManifestError, load_canonical # noqa: E402 from samples.manifest import SampleManifest # noqa: E402 # LSB-style sysadmin-error exit code: do NOT respawn (paired with # RestartPreventExitStatus=78 on cis490-orchestrator.service). EXIT_SYSADMIN_ERROR = 78 def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-fleet") p.add_argument("--capacity", action="store_true", help="Print capacity calculation and exit") p.add_argument("--waves", type=int, default=1, help="Number of episode waves to run") p.add_argument("--data-root", default="data", help="Per-host writable directory for episode dirs") p.add_argument("--host-id", default=os.environ.get("FLEET_HOST_ID") or os.uname().nodename, help="Stable identity for this host (per-host)") p.add_argument("--log-level", default="INFO") args = p.parse_args(argv) logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger("cis490.fleet") repo_root = Path(__file__).resolve().parent.parent # Canonical manifest. PIPELINE.md §4.1 / §4.7 — failure to load is # a sysadmin error, not a runtime warning. Exit 78 so systemd's # RestartPreventExitStatus=78 keeps the unit stuck-and-loud rather # than respawning into the same broken state. try: experiment = load_canonical(repo_root) except ManifestError as e: log.error("canonical manifest failed to load: %s", e) log.error( "this host cannot run the experiment without a valid " "manifest.toml at %s. Per §4.1 there is no fallback. " "Exiting %d (sysadmin error).", repo_root / "manifest.toml", EXIT_SYSADMIN_ERROR, ) return EXIT_SYSADMIN_ERROR log.info("loaded experiment manifest: %s v%d (schema_version=%d)", experiment.name, 1, experiment.schema_version) if args.capacity: print(capacity_report(ram_per_vm_mib=experiment.ram_per_vm_mib)) return 0 samples_path = repo_root / experiment.samples_manifest_path samples = SampleManifest.load(samples_path) # Module catalog admission: only load modules whose name appears in # experiment.catalog. Modules without a catalog entry don't get # loaded into the runtime catalog regardless of whether their toml # is on disk. PIPELINE.md §4.3. modules_dir = repo_root / "exploits" / "modules" on_disk_modules = ( load_module_configs(modules_dir) if modules_dir.exists() else {} ) admitted_names = {entry.name for entry in experiment.catalog} modules = { name: cfg for name, cfg in on_disk_modules.items() if name in admitted_names } if admitted_names and (admitted_names - set(modules.keys())): missing = sorted(admitted_names - set(modules.keys())) log.error( "manifest.catalog references modules whose toml is missing " "from %s: %s. Refusing to start (§4.3).", modules_dir, missing, ) return EXIT_SYSADMIN_ERROR cfg = FleetConfig( host_id=args.host_id, repo_root=repo_root, data_root=Path(args.data_root).resolve(), experiment=experiment, samples=samples, modules=modules, ) runner = FleetRunner(cfg) def _stop(signum, frame): # noqa: ARG001 runner.stop() signal.signal(signal.SIGTERM, _stop) signal.signal(signal.SIGINT, _stop) result = runner.run(episodes=args.waves) print(json.dumps({ "host_id": args.host_id, "experiment": experiment.name, "capacity": result.capacity.to_dict(), "modules_loaded": sorted(modules.keys()), "slots": [ { "slot": s.slot, "sample": s.sample_name, "sample_kind": s.sample_kind, "tier": s.tier, "module": s.module_name, "rc": s.rc, "duration_s": s.duration_s, "error": s.error, } for s in result.slots ], "total_duration_s": result.total_duration_s, }, indent=2)) return 0 if all(s.rc == 0 for s in result.slots) else 1 if __name__ == "__main__": sys.exit(main())