CIS490/orchestrator/manifest.py
Max Gorog 3d4f282e9c Tier-2 episodes use clean-only schedule; .gitignore VERSION
Two correctness fixes that the §4.5 event-driven labeller surfaced:

1. tools/run_real_vm_demo.py was hardcoding a Tier-3-shaped schedule
   (clean → armed → infecting → infected_running → ...) for episodes
   with no exploit firing. Pre-§4.5 those episodes wrote dishonest
   `infected_running` labels from the schedule clock — exactly the §3
   evidence pattern. Post-§4.5 they write `failed` at the infecting
   transition (the justifying exploit_fire never arrives), which is
   honest about what happened but useless for training.

   The honest fix: Tier-2 episodes have a clean-only schedule. All
   telemetry tagged `clean` because nothing infected anything. The
   total duration matches the canonical Tier-3 schedule so episode
   lengths are comparable across tiers — no length-bias in the
   dataset (§10).

   Helper `tier2_schedule_from(schedule)` in orchestrator/manifest.py
   derives `[("clean", total_seconds)]` from the canonical schedule.
   `tier3_schedule_from(schedule)` renders the legacy
   `[(name, seconds)]` shape EpisodeConfig still expects.

   Tier-2 demo (run_real_vm_demo.py) now calls tier2_schedule_from.
   Tier-3 demo (run_tier3_demo.py) now calls tier3_schedule_from.
   Drops the hardcoded DEFAULT_SCHEDULE constants from both — the
   canonical manifest is the single source of truth (§4.1).

2. .gitignore now excludes /VERSION. The install-lab-host.sh stamp
   writes /opt/cis490/VERSION so episodes can record code provenance
   without /opt/cis490 carrying a .git directory. But /opt/cis490 IS
   typically a git checkout on lab hosts (auto-update.sh pulls into
   it), so writing VERSION leaves the working tree dirty. Every
   episode's meta.code_version.dirty=true. PIPELINE.md §4.6 acceptance
   gate's rule 4 would then reject every episode without
   CIS490_ALLOW_DIRTY=1 set — which would break the data flow.

   Now VERSION is .gitignored: install-lab-host.sh stamps it, git
   status doesn't see it, dirty=false, gate rule 4 passes naturally.

These two changes together keep the data flowing AND honest. Tier-2
episodes pass with `phases=[clean]` + every collector emitting real
rows. Tier-3 episodes (none today, empty catalog) walk the full
event-driven schedule when a verified module gets re-admitted.

286 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:55:37 -05:00

398 lines
14 KiB
Python

"""Canonical experiment manifest (PIPELINE.md §4.1 / §13).
The manifest at `<repo_root>/manifest.toml` is the single source of
truth for what the experiment is: which collectors run, at what
cadence, against what targets, with which exploit modules in rotation,
walking which phase budget. Every lab host loads THIS file. There is
no per-host override flag, no `--manifest <path>` argument, no
fallback. A host that can't load and validate the canonical manifest
must exit 78 and ship zero episodes.
`load_canonical(repo_root)` reads from the fixed path and validates.
On any failure it raises `ManifestError`; callers translate that into
exit 78. `Manifest` is a frozen dataclass — once loaded the values
don't move under us mid-run.
Substantive amendments follow PIPELINE.md §16: operator sign-off,
landed in the same merge as the code change, with §8 decision tests
applied to the amendment itself.
"""
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
CANONICAL_FILENAME = "manifest.toml"
def tier3_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]:
"""Render the canonical schedule as the legacy
`[(name, seconds)]` format EpisodeConfig.phase_schedule expects.
This IS the Tier-3 schedule: clean → armed → infecting →
infected_running → ... per the canonical manifest. Phase labels
are event-driven (PIPELINE.md §4.5) so durations are budgets, not
label sources."""
return [(p.name, p.seconds) for p in schedule]
def tier2_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]:
"""Tier-2 episodes have no exploit and no driver firing modules.
Walking the Tier-3 phase set on a Tier-2 episode produces dishonest
`infected_running` labels (PIPELINE.md §3 evidence — the original
sin) under clock-driven labelling, OR `failed` labels under
event-driven labelling (still useless for training).
Honest fix: Tier-2 episodes ride a single `clean` phase for the
same total wall-clock as the Tier-3 walk so episode lengths are
comparable across tiers (no length-bias in the dataset). Every
telemetry row on a Tier-2 episode is tagged `clean` because
nothing infected anything."""
total = sum(p.seconds for p in schedule)
return [("clean", float(total))]
# Closed enums — keep in sync with the corresponding code that
# implements each name. A name not in these sets means the manifest
# is asking for something the orchestrator doesn't know how to do.
KNOWN_COLLECTORS: frozenset[str] = frozenset({
"proc",
"qmp",
"perf",
"guest_agent",
"pcap",
"netflow",
})
KNOWN_PHASES: frozenset[str] = frozenset({
"clean",
"armed",
"infecting",
"infected_running",
"dormant",
"failed",
})
class ManifestError(ValueError):
"""Raised when the canonical manifest is missing, unreadable, or
fails validation. The orchestrator translates this into exit 78
(PIPELINE.md §4.7 / §9)."""
@dataclass(frozen=True)
class Phase:
name: str
seconds: float
@dataclass(frozen=True)
class CollectorIntervals:
proc_ms: int
qmp_ms: int
perf_ms: int
guest_agent_ms: int
pcap_snaplen: int
netflow_bucket_ms: int
@dataclass(frozen=True)
class FleetPolicy:
max_concurrent_ceiling: int
max_tier3_slots: int
@dataclass(frozen=True)
class CatalogEntry:
name: str
verified_against: str
last_verified: str
@dataclass(frozen=True)
class TargetSpec:
image_name: str
sha256: str
build_script: str
@dataclass(frozen=True)
class Manifest:
schema_version: int
name: str
ram_per_vm_mib: int
schedule: tuple[Phase, ...]
fleet: FleetPolicy
collectors_active: tuple[str, ...]
intervals: CollectorIntervals
catalog: tuple[CatalogEntry, ...]
targets: tuple[TargetSpec, ...]
samples_manifest_path: str
# Resolved repo root + manifest path so callers can stamp them
# into meta.json for provenance without re-deriving.
repo_root: Path = field(repr=False)
manifest_path: Path = field(repr=False)
def to_meta(self) -> dict:
"""Lightweight representation suitable for embedding in
meta.json so episodes carry their experiment provenance.
Excludes the resolved Path fields (host-specific paths don't
belong in the wire-format)."""
return {
"schema_version": self.schema_version,
"name": self.name,
"ram_per_vm_mib": self.ram_per_vm_mib,
"phases": [
{"name": p.name, "seconds": p.seconds} for p in self.schedule
],
"fleet": {
"max_concurrent_ceiling": self.fleet.max_concurrent_ceiling,
"max_tier3_slots": self.fleet.max_tier3_slots,
},
"collectors_active": list(self.collectors_active),
"intervals": {
"proc_ms": self.intervals.proc_ms,
"qmp_ms": self.intervals.qmp_ms,
"perf_ms": self.intervals.perf_ms,
"guest_agent_ms": self.intervals.guest_agent_ms,
"pcap_snaplen": self.intervals.pcap_snaplen,
"netflow_bucket_ms": self.intervals.netflow_bucket_ms,
},
"catalog": [
{"name": c.name, "verified_against": c.verified_against,
"last_verified": c.last_verified}
for c in self.catalog
],
"targets": [
{"image_name": t.image_name, "sha256": t.sha256,
"build_script": t.build_script}
for t in self.targets
],
}
def load_canonical(repo_root: Path | str) -> Manifest:
"""Load + validate `<repo_root>/manifest.toml`. There is no
`manifest_path` parameter on purpose — per §4.1 the canonical
manifest lives at exactly one path. Callers that pass the path
directly are off the runbook.
Raises ManifestError on any failure. Successful return guarantees
every field is present, every collector name is known, every
phase name is known, and every catalog entry has both
verified_against and last_verified.
"""
repo_root = Path(repo_root).resolve()
path = repo_root / CANONICAL_FILENAME
if not path.exists():
raise ManifestError(
f"canonical manifest not found at {path}. "
f"PIPELINE.md §4.1 requires exactly one manifest at the "
f"repo root; this host cannot run the experiment without it."
)
try:
raw = tomllib.loads(path.read_text())
except (OSError, tomllib.TOMLDecodeError) as e:
raise ManifestError(f"cannot parse {path}: {e}") from e
return _validate(raw, repo_root, path)
def _validate(raw: dict, repo_root: Path, path: Path) -> Manifest:
schema_version = _require_int(raw, "schema_version")
if schema_version != 1:
raise ManifestError(
f"manifest schema_version={schema_version} not supported; "
f"this orchestrator handles version 1 only. "
f"Upgrade orchestrator or downgrade manifest to match."
)
name = _require_str(raw, "name")
experiment = _require_dict(raw, "experiment")
ram_per_vm_mib = _require_int(experiment, "ram_per_vm_mib")
if ram_per_vm_mib <= 0:
raise ManifestError(
f"experiment.ram_per_vm_mib must be positive, got {ram_per_vm_mib}"
)
schedule_block = _require_dict(experiment, "schedule")
phases_raw = schedule_block.get("phases")
if not isinstance(phases_raw, list) or not phases_raw:
raise ManifestError(
"experiment.schedule.phases must be a non-empty array"
)
phases: list[Phase] = []
for i, p in enumerate(phases_raw):
if not isinstance(p, dict):
raise ManifestError(
f"experiment.schedule.phases[{i}] must be a table"
)
pname = _require_str(p, "name", ctx=f"phases[{i}]")
if pname not in KNOWN_PHASES:
raise ManifestError(
f"experiment.schedule.phases[{i}].name={pname!r} not in "
f"KNOWN_PHASES {sorted(KNOWN_PHASES)}"
)
secs = _require_float(p, "seconds", ctx=f"phases[{i}]")
if secs <= 0:
raise ManifestError(
f"experiment.schedule.phases[{i}].seconds must be > 0, "
f"got {secs}"
)
phases.append(Phase(name=pname, seconds=secs))
fleet_block = _require_dict(experiment, "fleet")
fleet = FleetPolicy(
max_concurrent_ceiling=_require_int(fleet_block, "max_concurrent_ceiling"),
max_tier3_slots=_require_int(fleet_block, "max_tier3_slots"),
)
if fleet.max_concurrent_ceiling < 0 or fleet.max_tier3_slots < 0:
raise ManifestError(
"experiment.fleet ceilings must be >= 0 (0 = no cap)"
)
collectors_block = _require_dict(raw, "collectors")
active_raw = collectors_block.get("active")
if not isinstance(active_raw, list):
raise ManifestError("collectors.active must be an array")
if len(set(active_raw)) != len(active_raw):
raise ManifestError(
f"collectors.active contains duplicates: {active_raw}"
)
for c in active_raw:
if c not in KNOWN_COLLECTORS:
raise ManifestError(
f"collectors.active references unknown collector "
f"{c!r}; known: {sorted(KNOWN_COLLECTORS)}"
)
collectors_active = tuple(active_raw)
intervals_block = _require_dict(collectors_block, "intervals")
intervals = CollectorIntervals(
proc_ms=_require_int(intervals_block, "proc_ms"),
qmp_ms=_require_int(intervals_block, "qmp_ms"),
perf_ms=_require_int(intervals_block, "perf_ms"),
guest_agent_ms=_require_int(intervals_block, "guest_agent_ms"),
pcap_snaplen=_require_int(intervals_block, "pcap_snaplen"),
netflow_bucket_ms=_require_int(intervals_block, "netflow_bucket_ms"),
)
for fname, fval in (
("proc_ms", intervals.proc_ms),
("qmp_ms", intervals.qmp_ms),
("perf_ms", intervals.perf_ms),
("guest_agent_ms", intervals.guest_agent_ms),
("pcap_snaplen", intervals.pcap_snaplen),
("netflow_bucket_ms", intervals.netflow_bucket_ms),
):
if fval <= 0:
raise ManifestError(
f"collectors.intervals.{fname} must be > 0, got {fval}"
)
catalog_block = _require_dict(raw, "catalog")
modules_raw = catalog_block.get("modules")
if not isinstance(modules_raw, list):
raise ManifestError("catalog.modules must be an array")
catalog: list[CatalogEntry] = []
for i, entry in enumerate(modules_raw):
if not isinstance(entry, dict):
raise ManifestError(f"catalog.modules[{i}] must be a table")
cname = _require_str(entry, "name", ctx=f"catalog[{i}]")
verified_against = _require_str(
entry, "verified_against", ctx=f"catalog[{i}]"
)
last_verified = _require_str(
entry, "last_verified", ctx=f"catalog[{i}]"
)
catalog.append(CatalogEntry(
name=cname,
verified_against=verified_against,
last_verified=last_verified,
))
targets_block = _require_dict(raw, "targets")
images_raw = targets_block.get("images")
if not isinstance(images_raw, list):
raise ManifestError("targets.images must be an array")
targets: list[TargetSpec] = []
for i, t in enumerate(images_raw):
if not isinstance(t, dict):
raise ManifestError(f"targets.images[{i}] must be a table")
targets.append(TargetSpec(
image_name=_require_str(t, "image_name", ctx=f"targets[{i}]"),
sha256=_require_str(t, "sha256", ctx=f"targets[{i}]"),
build_script=_require_str(t, "build_script", ctx=f"targets[{i}]"),
))
samples_block = _require_dict(raw, "samples")
samples_manifest_path = _require_str(samples_block, "manifest_path")
return Manifest(
schema_version=schema_version,
name=name,
ram_per_vm_mib=ram_per_vm_mib,
schedule=tuple(phases),
fleet=fleet,
collectors_active=collectors_active,
intervals=intervals,
catalog=tuple(catalog),
targets=tuple(targets),
samples_manifest_path=samples_manifest_path,
repo_root=repo_root,
manifest_path=path,
)
# ---------- helpers --------------------------------------------------
def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object:
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if not isinstance(v, kind):
raise ManifestError(
f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}"
)
return v
def _require_str(d: dict, key: str, *, ctx: str = "") -> str:
return _require(d, key, str, ctx=ctx) # type: ignore[return-value]
def _require_int(d: dict, key: str, *, ctx: str = "") -> int:
# tomllib parses both ints and floats; require strict int for fields
# that should be ints, accept int-valued floats for ergonomics.
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if isinstance(v, bool): # bool is a subclass of int — reject explicitly
raise ManifestError(f"field {where}{key} must be int, got bool")
if isinstance(v, int):
return v
raise ManifestError(
f"field {where}{key} must be int, got {type(v).__name__}"
)
def _require_float(d: dict, key: str, *, ctx: str = "") -> float:
where = f"{ctx}." if ctx else ""
if key not in d:
raise ManifestError(f"missing required field {where}{key}")
v = d[key]
if isinstance(v, bool):
raise ManifestError(f"field {where}{key} must be number, got bool")
if isinstance(v, (int, float)):
return float(v)
raise ManifestError(
f"field {where}{key} must be number, got {type(v).__name__}"
)
def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict:
return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]