CIS490/orchestrator/manifest.py
Max Gorog 207a902c3e PIPELINE §5 step 2: canonical manifest at <repo>/manifest.toml
The experiment is now defined by a single version-pinned file —
manifest.toml at the repo root. PIPELINE.md §4.1 / §13 / §16. Every
lab host loads THIS exact file; per-host overrides of experiment
shape are forbidden.

Drops the following per-host CLI overrides that previously violated
the canonical-manifest principle:
  * --manifest, --modules-dir       (paths now derived)
  * --ram-per-vm-mib                (in manifest.experiment)
  * --max-concurrent                (manifest.experiment.fleet.max_concurrent_ceiling)
  * --max-tier3-slots               (manifest.experiment.fleet.max_tier3_slots)
  * --force-tier2                   (not a §14 sanctioned override knob —
                                     ship empty catalog to disable Tier-3)
  * --require-real-samples          (sample-side concern; out of fleet scope)
  * tools/run_*_demo.py --manifest  (samples path now from canonical)

New surface:
  * manifest.toml                   — the single source of truth
  * orchestrator/manifest.py        — load_canonical() + Manifest dataclass
                                      with strict validation, raises
                                      ManifestError on any failure
  * EpisodeConfig.experiment_meta   — populated by run_*_demo.py from
                                      the canonical manifest; stamped
                                      into every episode's meta.json
                                      under "experiment" key for
                                      provenance
  * cis490-orchestrator.service     — RestartPreventExitStatus=78 so
                                      manifest-load failures stay
                                      stuck-and-loud (§9, §4.7)
  * install-lab-host.sh             — validates manifest.toml at
                                      install time; missing or invalid
                                      = die with clear message

Catalog admission semantics: only modules whose name appears in
manifest.catalog get loaded into the runtime catalog (§4.3 in
miniature, will tighten further in step 4 when verified_against /
last_verified actually gate admission). Missing toml for an admitted
name is a sysadmin error → exit 78.

Renames cfg.manifest → cfg.samples + adds cfg.experiment to
disambiguate sample-manifest from experiment-manifest. Rewrites
test_fleet.py fixture to construct synthetic Manifest objects so
test outcomes don't depend on the on-disk manifest.toml content.

12 new tests in tests/test_manifest.py: schema-version mismatch,
unknown collector, duplicate collector, unknown phase, negative
phase seconds, negative ram, missing catalog fields, json round-trip.

Local run: `python tools/run_fleet.py --capacity` correctly logs the
loaded manifest and prints capacity. 241 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:25:01 -05:00

371 lines
13 KiB
Python

"""Canonical experiment manifest (PIPELINE.md §4.1 / §13).
The manifest at `<repo_root>/manifest.toml` is the single source of
truth for what the experiment is: which collectors run, at what
cadence, against what targets, with which exploit modules in rotation,
walking which phase budget. Every lab host loads THIS file. There is
no per-host override flag, no `--manifest <path>` argument, no
fallback. A host that can't load and validate the canonical manifest
must exit 78 and ship zero episodes.
`load_canonical(repo_root)` reads from the fixed path and validates.
On any failure it raises `ManifestError`; callers translate that into
exit 78. `Manifest` is a frozen dataclass — once loaded the values
don't move under us mid-run.
Substantive amendments follow PIPELINE.md §16: operator sign-off,
landed in the same merge as the code change, with §8 decision tests
applied to the amendment itself.
"""
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
CANONICAL_FILENAME = "manifest.toml"
# Closed enums — keep in sync with the corresponding code that
# implements each name. A name not in these sets means the manifest
# is asking for something the orchestrator doesn't know how to do.
KNOWN_COLLECTORS: frozenset[str] = frozenset({
"proc",
"qmp",
"perf",
"guest_agent",
"pcap",
"netflow",
})
KNOWN_PHASES: frozenset[str] = frozenset({
"clean",
"armed",
"infecting",
"infected_running",
"dormant",
"failed",
})
class ManifestError(ValueError):
"""Raised when the canonical manifest is missing, unreadable, or
fails validation. The orchestrator translates this into exit 78
(PIPELINE.md §4.7 / §9)."""
@dataclass(frozen=True)
class Phase:
name: str
seconds: float
@dataclass(frozen=True)
class CollectorIntervals:
proc_ms: int
qmp_ms: int
perf_ms: int
guest_agent_ms: int
pcap_snaplen: int
netflow_bucket_ms: int
@dataclass(frozen=True)
class FleetPolicy:
max_concurrent_ceiling: int
max_tier3_slots: int
@dataclass(frozen=True)
class CatalogEntry:
name: str
verified_against: str
last_verified: str
@dataclass(frozen=True)
class TargetSpec:
image_name: str
sha256: str
build_script: str
@dataclass(frozen=True)
class Manifest:
schema_version: int
name: str
ram_per_vm_mib: int
schedule: tuple[Phase, ...]
fleet: FleetPolicy
collectors_active: tuple[str, ...]
intervals: CollectorIntervals
catalog: tuple[CatalogEntry, ...]
targets: tuple[TargetSpec, ...]
samples_manifest_path: str
# Resolved repo root + manifest path so callers can stamp them
# into meta.json for provenance without re-deriving.
repo_root: Path = field(repr=False)
manifest_path: Path = field(repr=False)
def to_meta(self) -> dict:
"""Lightweight representation suitable for embedding in
meta.json so episodes carry their experiment provenance.
Excludes the resolved Path fields (host-specific paths don't
belong in the wire-format)."""
return {
"schema_version": self.schema_version,
"name": self.name,
"ram_per_vm_mib": self.ram_per_vm_mib,
"phases": [
{"name": p.name, "seconds": p.seconds} for p in self.schedule
],
"fleet": {
"max_concurrent_ceiling": self.fleet.max_concurrent_ceiling,
"max_tier3_slots": self.fleet.max_tier3_slots,
},
"collectors_active": list(self.collectors_active),
"intervals": {
"proc_ms": self.intervals.proc_ms,
"qmp_ms": self.intervals.qmp_ms,
"perf_ms": self.intervals.perf_ms,
"guest_agent_ms": self.intervals.guest_agent_ms,
"pcap_snaplen": self.intervals.pcap_snaplen,
"netflow_bucket_ms": self.intervals.netflow_bucket_ms,
},
"catalog": [
{"name": c.name, "verified_against": c.verified_against,
"last_verified": c.last_verified}
for c in self.catalog
],
"targets": [
{"image_name": t.image_name, "sha256": t.sha256,
"build_script": t.build_script}
for t in self.targets
],
}
def load_canonical(repo_root: Path | str) -> Manifest:
"""Load + validate `<repo_root>/manifest.toml`. There is no
`manifest_path` parameter on purpose — per §4.1 the canonical
manifest lives at exactly one path. Callers that pass the path
directly are off the runbook.
Raises ManifestError on any failure. Successful return guarantees
every field is present, every collector name is known, every
phase name is known, and every catalog entry has both
verified_against and last_verified.
"""
repo_root = Path(repo_root).resolve()
path = repo_root / CANONICAL_FILENAME
if not path.exists():
raise ManifestError(
f"canonical manifest not found at {path}. "
f"PIPELINE.md §4.1 requires exactly one manifest at the "
f"repo root; this host cannot run the experiment without it."
)
try:
raw = tomllib.loads(path.read_text())
except (OSError, tomllib.TOMLDecodeError) as e:
raise ManifestError(f"cannot parse {path}: {e}") from e
return _validate(raw, repo_root, path)
def _validate(raw: dict, repo_root: Path, path: Path) -> Manifest:
schema_version = _require_int(raw, "schema_version")
if schema_version != 1:
raise ManifestError(
f"manifest schema_version={schema_version} not supported; "
f"this orchestrator handles version 1 only. "
f"Upgrade orchestrator or downgrade manifest to match."
)
name = _require_str(raw, "name")
experiment = _require_dict(raw, "experiment")
ram_per_vm_mib = _require_int(experiment, "ram_per_vm_mib")
if ram_per_vm_mib <= 0:
raise ManifestError(
f"experiment.ram_per_vm_mib must be positive, got {ram_per_vm_mib}"
)
schedule_block = _require_dict(experiment, "schedule")
phases_raw = schedule_block.get("phases")
if not isinstance(phases_raw, list) or not phases_raw:
raise ManifestError(
"experiment.schedule.phases must be a non-empty array"
)
phases: list[Phase] = []
for i, p in enumerate(phases_raw):
if not isinstance(p, dict):
raise ManifestError(
f"experiment.schedule.phases[{i}] must be a table"
)
pname = _require_str(p, "name", ctx=f"phases[{i}]")
if pname not in KNOWN_PHASES:
raise ManifestError(
f"experiment.schedule.phases[{i}].name={pname!r} not in "
f"KNOWN_PHASES {sorted(KNOWN_PHASES)}"
)
secs = _require_float(p, "seconds", ctx=f"phases[{i}]")
if secs <= 0:
raise ManifestError(
f"experiment.schedule.phases[{i}].seconds must be > 0, "
f"got {secs}"
)
phases.append(Phase(name=pname, seconds=secs))
fleet_block = _require_dict(experiment, "fleet")
fleet = FleetPolicy(
max_concurrent_ceiling=_require_int(fleet_block, "max_concurrent_ceiling"),
max_tier3_slots=_require_int(fleet_block, "max_tier3_slots"),
)
if fleet.max_concurrent_ceiling < 0 or fleet.max_tier3_slots < 0:
raise ManifestError(
"experiment.fleet ceilings must be >= 0 (0 = no cap)"
)
collectors_block = _require_dict(raw, "collectors")
active_raw = collectors_block.get("active")
if not isinstance(active_raw, list):
raise ManifestError("collectors.active must be an array")
if len(set(active_raw)) != len(active_raw):
raise ManifestError(
f"collectors.active contains duplicates: {active_raw}"
)
for c in active_raw:
if c not in KNOWN_COLLECTORS:
raise ManifestError(
f"collectors.active references unknown collector "
f"{c!r}; known: {sorted(KNOWN_COLLECTORS)}"
)
collectors_active = tuple(active_raw)
intervals_block = _require_dict(collectors_block, "intervals")
intervals = CollectorIntervals(
proc_ms=_require_int(intervals_block, "proc_ms"),
qmp_ms=_require_int(intervals_block, "qmp_ms"),
perf_ms=_require_int(intervals_block, "perf_ms"),
guest_agent_ms=_require_int(intervals_block, "guest_agent_ms"),
pcap_snaplen=_require_int(intervals_block, "pcap_snaplen"),
netflow_bucket_ms=_require_int(intervals_block, "netflow_bucket_ms"),
)
for fname, fval in (
("proc_ms", intervals.proc_ms),
("qmp_ms", intervals.qmp_ms),
("perf_ms", intervals.perf_ms),
("guest_agent_ms", intervals.guest_agent_ms),
("pcap_snaplen", intervals.pcap_snaplen),
("netflow_bucket_ms", intervals.netflow_bucket_ms),
):
if fval <= 0:
raise ManifestError(
f"collectors.intervals.{fname} must be > 0, got {fval}"
)
catalog_block = _require_dict(raw, "catalog")
modules_raw = catalog_block.get("modules")
if not isinstance(modules_raw, list):
raise ManifestError("catalog.modules must be an array")
catalog: list[CatalogEntry] = []
for i, entry in enumerate(modules_raw):
if not isinstance(entry, dict):
raise ManifestError(f"catalog.modules[{i}] must be a table")
cname = _require_str(entry, "name", ctx=f"catalog[{i}]")
verified_against = _require_str(
entry, "verified_against", ctx=f"catalog[{i}]"
)
last_verified = _require_str(
entry, "last_verified", ctx=f"catalog[{i}]"
)
catalog.append(CatalogEntry(
name=cname,
verified_against=verified_against,
last_verified=last_verified,
))
targets_block = _require_dict(raw, "targets")
images_raw = targets_block.get("images")
if not isinstance(images_raw, list):
raise ManifestError("targets.images must be an array")
targets: list[TargetSpec] = []
for i, t in enumerate(images_raw):
if not isinstance(t, dict):
raise ManifestError(f"targets.images[{i}] must be a table")
targets.append(TargetSpec(
image_name=_require_str(t, "image_name", ctx=f"targets[{i}]"),
sha256=_require_str(t, "sha256", ctx=f"targets[{i}]"),
build_script=_require_str(t, "build_script", ctx=f"targets[{i}]"),
))
samples_block = _require_dict(raw, "samples")
samples_manifest_path = _require_str(samples_block, "manifest_path")
return Manifest(
schema_version=schema_version,
name=name,
ram_per_vm_mib=ram_per_vm_mib,
schedule=tuple(phases),
fleet=fleet,
collectors_active=collectors_active,
intervals=intervals,
catalog=tuple(catalog),
targets=tuple(targets),
samples_manifest_path=samples_manifest_path,
repo_root=repo_root,
manifest_path=path,
)
# ---------- helpers --------------------------------------------------
def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object:
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if not isinstance(v, kind):
raise ManifestError(
f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}"
)
return v
def _require_str(d: dict, key: str, *, ctx: str = "") -> str:
return _require(d, key, str, ctx=ctx) # type: ignore[return-value]
def _require_int(d: dict, key: str, *, ctx: str = "") -> int:
# tomllib parses both ints and floats; require strict int for fields
# that should be ints, accept int-valued floats for ergonomics.
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if isinstance(v, bool): # bool is a subclass of int — reject explicitly
raise ManifestError(f"field {where}{key} must be int, got bool")
if isinstance(v, int):
return v
raise ManifestError(
f"field {where}{key} must be int, got {type(v).__name__}"
)
def _require_float(d: dict, key: str, *, ctx: str = "") -> float:
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if isinstance(v, bool):
raise ManifestError(f"field {where}{key} must be number, got bool")
if isinstance(v, (int, float)):
return float(v)
raise ManifestError(
f"field {where}{key} must be number, got {type(v).__name__}"
)
def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict:
return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]