"""Canonical experiment manifest (PIPELINE.md §4.1 / §13). The manifest at `/manifest.toml` is the single source of truth for what the experiment is: which collectors run, at what cadence, against what targets, with which exploit modules in rotation, walking which phase budget. Every lab host loads THIS file. There is no per-host override flag, no `--manifest ` argument, no fallback. A host that can't load and validate the canonical manifest must exit 78 and ship zero episodes. `load_canonical(repo_root)` reads from the fixed path and validates. On any failure it raises `ManifestError`; callers translate that into exit 78. `Manifest` is a frozen dataclass — once loaded the values don't move under us mid-run. Substantive amendments follow PIPELINE.md §16: operator sign-off, landed in the same merge as the code change, with §8 decision tests applied to the amendment itself. """ from __future__ import annotations import tomllib from dataclasses import dataclass, field from pathlib import Path CANONICAL_FILENAME = "manifest.toml" def tier3_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]: """Render the canonical schedule as the legacy `[(name, seconds)]` format EpisodeConfig.phase_schedule expects. This IS the Tier-3 schedule: clean → armed → infecting → infected_running → ... per the canonical manifest. Phase labels are event-driven (PIPELINE.md §4.5) so durations are budgets, not label sources.""" return [(p.name, p.seconds) for p in schedule] def tier2_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]: """Tier-2 episodes have no exploit and no driver firing modules. Walking the Tier-3 phase set on a Tier-2 episode produces dishonest `infected_running` labels (PIPELINE.md §3 evidence — the original sin) under clock-driven labelling, OR `failed` labels under event-driven labelling (still useless for training). Honest fix: Tier-2 episodes ride a single `clean` phase for the same total wall-clock as the Tier-3 walk so episode lengths are comparable across tiers (no length-bias in the dataset). Every telemetry row on a Tier-2 episode is tagged `clean` because nothing infected anything.""" total = sum(p.seconds for p in schedule) return [("clean", float(total))] # Closed enums — keep in sync with the corresponding code that # implements each name. A name not in these sets means the manifest # is asking for something the orchestrator doesn't know how to do. KNOWN_COLLECTORS: frozenset[str] = frozenset({ "proc", "qmp", "perf", "guest_agent", "pcap", "netflow", }) KNOWN_PHASES: frozenset[str] = frozenset({ "clean", "armed", "infecting", "infected_running", "dormant", "failed", }) class ManifestError(ValueError): """Raised when the canonical manifest is missing, unreadable, or fails validation. The orchestrator translates this into exit 78 (PIPELINE.md §4.7 / §9).""" @dataclass(frozen=True) class Phase: name: str seconds: float @dataclass(frozen=True) class CollectorIntervals: proc_ms: int qmp_ms: int perf_ms: int guest_agent_ms: int pcap_snaplen: int netflow_bucket_ms: int @dataclass(frozen=True) class FleetPolicy: max_concurrent_ceiling: int max_tier3_slots: int @dataclass(frozen=True) class CatalogEntry: name: str verified_against: str last_verified: str @dataclass(frozen=True) class TargetSpec: image_name: str sha256: str build_script: str @dataclass(frozen=True) class Manifest: schema_version: int name: str ram_per_vm_mib: int schedule: tuple[Phase, ...] fleet: FleetPolicy collectors_active: tuple[str, ...] intervals: CollectorIntervals catalog: tuple[CatalogEntry, ...] targets: tuple[TargetSpec, ...] samples_manifest_path: str # Resolved repo root + manifest path so callers can stamp them # into meta.json for provenance without re-deriving. repo_root: Path = field(repr=False) manifest_path: Path = field(repr=False) def to_meta(self) -> dict: """Lightweight representation suitable for embedding in meta.json so episodes carry their experiment provenance. Excludes the resolved Path fields (host-specific paths don't belong in the wire-format).""" return { "schema_version": self.schema_version, "name": self.name, "ram_per_vm_mib": self.ram_per_vm_mib, "phases": [ {"name": p.name, "seconds": p.seconds} for p in self.schedule ], "fleet": { "max_concurrent_ceiling": self.fleet.max_concurrent_ceiling, "max_tier3_slots": self.fleet.max_tier3_slots, }, "collectors_active": list(self.collectors_active), "intervals": { "proc_ms": self.intervals.proc_ms, "qmp_ms": self.intervals.qmp_ms, "perf_ms": self.intervals.perf_ms, "guest_agent_ms": self.intervals.guest_agent_ms, "pcap_snaplen": self.intervals.pcap_snaplen, "netflow_bucket_ms": self.intervals.netflow_bucket_ms, }, "catalog": [ {"name": c.name, "verified_against": c.verified_against, "last_verified": c.last_verified} for c in self.catalog ], "targets": [ {"image_name": t.image_name, "sha256": t.sha256, "build_script": t.build_script} for t in self.targets ], } def load_canonical(repo_root: Path | str) -> Manifest: """Load + validate `/manifest.toml`. There is no `manifest_path` parameter on purpose — per §4.1 the canonical manifest lives at exactly one path. Callers that pass the path directly are off the runbook. Raises ManifestError on any failure. Successful return guarantees every field is present, every collector name is known, every phase name is known, and every catalog entry has both verified_against and last_verified. """ repo_root = Path(repo_root).resolve() path = repo_root / CANONICAL_FILENAME if not path.exists(): raise ManifestError( f"canonical manifest not found at {path}. " f"PIPELINE.md §4.1 requires exactly one manifest at the " f"repo root; this host cannot run the experiment without it." ) try: raw = tomllib.loads(path.read_text()) except (OSError, tomllib.TOMLDecodeError) as e: raise ManifestError(f"cannot parse {path}: {e}") from e return _validate(raw, repo_root, path) def _validate(raw: dict, repo_root: Path, path: Path) -> Manifest: schema_version = _require_int(raw, "schema_version") if schema_version != 1: raise ManifestError( f"manifest schema_version={schema_version} not supported; " f"this orchestrator handles version 1 only. " f"Upgrade orchestrator or downgrade manifest to match." ) name = _require_str(raw, "name") experiment = _require_dict(raw, "experiment") ram_per_vm_mib = _require_int(experiment, "ram_per_vm_mib") if ram_per_vm_mib <= 0: raise ManifestError( f"experiment.ram_per_vm_mib must be positive, got {ram_per_vm_mib}" ) schedule_block = _require_dict(experiment, "schedule") phases_raw = schedule_block.get("phases") if not isinstance(phases_raw, list) or not phases_raw: raise ManifestError( "experiment.schedule.phases must be a non-empty array" ) phases: list[Phase] = [] for i, p in enumerate(phases_raw): if not isinstance(p, dict): raise ManifestError( f"experiment.schedule.phases[{i}] must be a table" ) pname = _require_str(p, "name", ctx=f"phases[{i}]") if pname not in KNOWN_PHASES: raise ManifestError( f"experiment.schedule.phases[{i}].name={pname!r} not in " f"KNOWN_PHASES {sorted(KNOWN_PHASES)}" ) secs = _require_float(p, "seconds", ctx=f"phases[{i}]") if secs <= 0: raise ManifestError( f"experiment.schedule.phases[{i}].seconds must be > 0, " f"got {secs}" ) phases.append(Phase(name=pname, seconds=secs)) fleet_block = _require_dict(experiment, "fleet") fleet = FleetPolicy( max_concurrent_ceiling=_require_int(fleet_block, "max_concurrent_ceiling"), max_tier3_slots=_require_int(fleet_block, "max_tier3_slots"), ) if fleet.max_concurrent_ceiling < 0 or fleet.max_tier3_slots < 0: raise ManifestError( "experiment.fleet ceilings must be >= 0 (0 = no cap)" ) collectors_block = _require_dict(raw, "collectors") active_raw = collectors_block.get("active") if not isinstance(active_raw, list): raise ManifestError("collectors.active must be an array") if len(set(active_raw)) != len(active_raw): raise ManifestError( f"collectors.active contains duplicates: {active_raw}" ) for c in active_raw: if c not in KNOWN_COLLECTORS: raise ManifestError( f"collectors.active references unknown collector " f"{c!r}; known: {sorted(KNOWN_COLLECTORS)}" ) collectors_active = tuple(active_raw) intervals_block = _require_dict(collectors_block, "intervals") intervals = CollectorIntervals( proc_ms=_require_int(intervals_block, "proc_ms"), qmp_ms=_require_int(intervals_block, "qmp_ms"), perf_ms=_require_int(intervals_block, "perf_ms"), guest_agent_ms=_require_int(intervals_block, "guest_agent_ms"), pcap_snaplen=_require_int(intervals_block, "pcap_snaplen"), netflow_bucket_ms=_require_int(intervals_block, "netflow_bucket_ms"), ) for fname, fval in ( ("proc_ms", intervals.proc_ms), ("qmp_ms", intervals.qmp_ms), ("perf_ms", intervals.perf_ms), ("guest_agent_ms", intervals.guest_agent_ms), ("pcap_snaplen", intervals.pcap_snaplen), ("netflow_bucket_ms", intervals.netflow_bucket_ms), ): if fval <= 0: raise ManifestError( f"collectors.intervals.{fname} must be > 0, got {fval}" ) catalog_block = _require_dict(raw, "catalog") modules_raw = catalog_block.get("modules") if not isinstance(modules_raw, list): raise ManifestError("catalog.modules must be an array") catalog: list[CatalogEntry] = [] for i, entry in enumerate(modules_raw): if not isinstance(entry, dict): raise ManifestError(f"catalog.modules[{i}] must be a table") cname = _require_str(entry, "name", ctx=f"catalog[{i}]") verified_against = _require_str( entry, "verified_against", ctx=f"catalog[{i}]" ) last_verified = _require_str( entry, "last_verified", ctx=f"catalog[{i}]" ) catalog.append(CatalogEntry( name=cname, verified_against=verified_against, last_verified=last_verified, )) targets_block = _require_dict(raw, "targets") images_raw = targets_block.get("images") if not isinstance(images_raw, list): raise ManifestError("targets.images must be an array") targets: list[TargetSpec] = [] for i, t in enumerate(images_raw): if not isinstance(t, dict): raise ManifestError(f"targets.images[{i}] must be a table") targets.append(TargetSpec( image_name=_require_str(t, "image_name", ctx=f"targets[{i}]"), sha256=_require_str(t, "sha256", ctx=f"targets[{i}]"), build_script=_require_str(t, "build_script", ctx=f"targets[{i}]"), )) samples_block = _require_dict(raw, "samples") samples_manifest_path = _require_str(samples_block, "manifest_path") return Manifest( schema_version=schema_version, name=name, ram_per_vm_mib=ram_per_vm_mib, schedule=tuple(phases), fleet=fleet, collectors_active=collectors_active, intervals=intervals, catalog=tuple(catalog), targets=tuple(targets), samples_manifest_path=samples_manifest_path, repo_root=repo_root, manifest_path=path, ) # ---------- helpers -------------------------------------------------- def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object: where = f"{ctx}." if ctx else "" if key not in d: raise ManifestError(f"missing required field {where}{key}") v = d[key] if not isinstance(v, kind): raise ManifestError( f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}" ) return v def _require_str(d: dict, key: str, *, ctx: str = "") -> str: return _require(d, key, str, ctx=ctx) # type: ignore[return-value] def _require_int(d: dict, key: str, *, ctx: str = "") -> int: # tomllib parses both ints and floats; require strict int for fields # that should be ints, accept int-valued floats for ergonomics. where = f"{ctx}." if ctx else "" if key not in d: raise ManifestError(f"missing required field {where}{key}") v = d[key] if isinstance(v, bool): # bool is a subclass of int — reject explicitly raise ManifestError(f"field {where}{key} must be int, got bool") if isinstance(v, int): return v raise ManifestError( f"field {where}{key} must be int, got {type(v).__name__}" ) def _require_float(d: dict, key: str, *, ctx: str = "") -> float: where = f"{ctx}." if ctx else "" if key not in d: raise ManifestError(f"missing required field {where}{key}") v = d[key] if isinstance(v, bool): raise ManifestError(f"field {where}{key} must be number, got bool") if isinstance(v, (int, float)): return float(v) raise ManifestError( f"field {where}{key} must be number, got {type(v).__name__}" ) def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict: return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]