Two correctness fixes that the §4.5 event-driven labeller surfaced:
1. tools/run_real_vm_demo.py was hardcoding a Tier-3-shaped schedule
(clean → armed → infecting → infected_running → ...) for episodes
with no exploit firing. Pre-§4.5 those episodes wrote dishonest
`infected_running` labels from the schedule clock — exactly the §3
evidence pattern. Post-§4.5 they write `failed` at the infecting
transition (the justifying exploit_fire never arrives), which is
honest about what happened but useless for training.
The honest fix: Tier-2 episodes have a clean-only schedule. All
telemetry tagged `clean` because nothing infected anything. The
total duration matches the canonical Tier-3 schedule so episode
lengths are comparable across tiers — no length-bias in the
dataset (§10).
Helper `tier2_schedule_from(schedule)` in orchestrator/manifest.py
derives `[("clean", total_seconds)]` from the canonical schedule.
`tier3_schedule_from(schedule)` renders the legacy
`[(name, seconds)]` shape EpisodeConfig still expects.
Tier-2 demo (run_real_vm_demo.py) now calls tier2_schedule_from.
Tier-3 demo (run_tier3_demo.py) now calls tier3_schedule_from.
Drops the hardcoded DEFAULT_SCHEDULE constants from both — the
canonical manifest is the single source of truth (§4.1).
2. .gitignore now excludes /VERSION. The install-lab-host.sh stamp
writes /opt/cis490/VERSION so episodes can record code provenance
without /opt/cis490 carrying a .git directory. But /opt/cis490 IS
typically a git checkout on lab hosts (auto-update.sh pulls into
it), so writing VERSION leaves the working tree dirty. Every
episode's meta.code_version.dirty=true. PIPELINE.md §4.6 acceptance
gate's rule 4 would then reject every episode without
CIS490_ALLOW_DIRTY=1 set — which would break the data flow.
Now VERSION is .gitignored: install-lab-host.sh stamps it, git
status doesn't see it, dirty=false, gate rule 4 passes naturally.
These two changes together keep the data flowing AND honest. Tier-2
episodes pass with `phases=[clean]` + every collector emitting real
rows. Tier-3 episodes (none today, empty catalog) walk the full
event-driven schedule when a verified module gets re-admitted.
286 tests passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""Canonical experiment manifest (PIPELINE.md §4.1 / §13).
|
|
|
|
The manifest at `<repo_root>/manifest.toml` is the single source of
|
|
truth for what the experiment is: which collectors run, at what
|
|
cadence, against what targets, with which exploit modules in rotation,
|
|
walking which phase budget. Every lab host loads THIS file. There is
|
|
no per-host override flag, no `--manifest <path>` argument, no
|
|
fallback. A host that can't load and validate the canonical manifest
|
|
must exit 78 and ship zero episodes.
|
|
|
|
`load_canonical(repo_root)` reads from the fixed path and validates.
|
|
On any failure it raises `ManifestError`; callers translate that into
|
|
exit 78. `Manifest` is a frozen dataclass — once loaded the values
|
|
don't move under us mid-run.
|
|
|
|
Substantive amendments follow PIPELINE.md §16: operator sign-off,
|
|
landed in the same merge as the code change, with §8 decision tests
|
|
applied to the amendment itself.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tomllib
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
|
|
CANONICAL_FILENAME = "manifest.toml"
|
|
|
|
|
|
def tier3_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]:
|
|
"""Render the canonical schedule as the legacy
|
|
`[(name, seconds)]` format EpisodeConfig.phase_schedule expects.
|
|
|
|
This IS the Tier-3 schedule: clean → armed → infecting →
|
|
infected_running → ... per the canonical manifest. Phase labels
|
|
are event-driven (PIPELINE.md §4.5) so durations are budgets, not
|
|
label sources."""
|
|
return [(p.name, p.seconds) for p in schedule]
|
|
|
|
|
|
def tier2_schedule_from(schedule: "tuple[Phase, ...]") -> list[tuple[str, float]]:
|
|
"""Tier-2 episodes have no exploit and no driver firing modules.
|
|
Walking the Tier-3 phase set on a Tier-2 episode produces dishonest
|
|
`infected_running` labels (PIPELINE.md §3 evidence — the original
|
|
sin) under clock-driven labelling, OR `failed` labels under
|
|
event-driven labelling (still useless for training).
|
|
|
|
Honest fix: Tier-2 episodes ride a single `clean` phase for the
|
|
same total wall-clock as the Tier-3 walk so episode lengths are
|
|
comparable across tiers (no length-bias in the dataset). Every
|
|
telemetry row on a Tier-2 episode is tagged `clean` because
|
|
nothing infected anything."""
|
|
total = sum(p.seconds for p in schedule)
|
|
return [("clean", float(total))]
|
|
|
|
# Closed enums — keep in sync with the corresponding code that
|
|
# implements each name. A name not in these sets means the manifest
|
|
# is asking for something the orchestrator doesn't know how to do.
|
|
KNOWN_COLLECTORS: frozenset[str] = frozenset({
|
|
"proc",
|
|
"qmp",
|
|
"perf",
|
|
"guest_agent",
|
|
"pcap",
|
|
"netflow",
|
|
})
|
|
|
|
KNOWN_PHASES: frozenset[str] = frozenset({
|
|
"clean",
|
|
"armed",
|
|
"infecting",
|
|
"infected_running",
|
|
"dormant",
|
|
"failed",
|
|
})
|
|
|
|
|
|
class ManifestError(ValueError):
|
|
"""Raised when the canonical manifest is missing, unreadable, or
|
|
fails validation. The orchestrator translates this into exit 78
|
|
(PIPELINE.md §4.7 / §9)."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Phase:
|
|
name: str
|
|
seconds: float
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CollectorIntervals:
|
|
proc_ms: int
|
|
qmp_ms: int
|
|
perf_ms: int
|
|
guest_agent_ms: int
|
|
pcap_snaplen: int
|
|
netflow_bucket_ms: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FleetPolicy:
|
|
max_concurrent_ceiling: int
|
|
max_tier3_slots: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CatalogEntry:
|
|
name: str
|
|
verified_against: str
|
|
last_verified: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TargetSpec:
|
|
image_name: str
|
|
sha256: str
|
|
build_script: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Manifest:
|
|
schema_version: int
|
|
name: str
|
|
ram_per_vm_mib: int
|
|
schedule: tuple[Phase, ...]
|
|
fleet: FleetPolicy
|
|
collectors_active: tuple[str, ...]
|
|
intervals: CollectorIntervals
|
|
catalog: tuple[CatalogEntry, ...]
|
|
targets: tuple[TargetSpec, ...]
|
|
samples_manifest_path: str
|
|
# Resolved repo root + manifest path so callers can stamp them
|
|
# into meta.json for provenance without re-deriving.
|
|
repo_root: Path = field(repr=False)
|
|
manifest_path: Path = field(repr=False)
|
|
|
|
def to_meta(self) -> dict:
|
|
"""Lightweight representation suitable for embedding in
|
|
meta.json so episodes carry their experiment provenance.
|
|
Excludes the resolved Path fields (host-specific paths don't
|
|
belong in the wire-format)."""
|
|
return {
|
|
"schema_version": self.schema_version,
|
|
"name": self.name,
|
|
"ram_per_vm_mib": self.ram_per_vm_mib,
|
|
"phases": [
|
|
{"name": p.name, "seconds": p.seconds} for p in self.schedule
|
|
],
|
|
"fleet": {
|
|
"max_concurrent_ceiling": self.fleet.max_concurrent_ceiling,
|
|
"max_tier3_slots": self.fleet.max_tier3_slots,
|
|
},
|
|
"collectors_active": list(self.collectors_active),
|
|
"intervals": {
|
|
"proc_ms": self.intervals.proc_ms,
|
|
"qmp_ms": self.intervals.qmp_ms,
|
|
"perf_ms": self.intervals.perf_ms,
|
|
"guest_agent_ms": self.intervals.guest_agent_ms,
|
|
"pcap_snaplen": self.intervals.pcap_snaplen,
|
|
"netflow_bucket_ms": self.intervals.netflow_bucket_ms,
|
|
},
|
|
"catalog": [
|
|
{"name": c.name, "verified_against": c.verified_against,
|
|
"last_verified": c.last_verified}
|
|
for c in self.catalog
|
|
],
|
|
"targets": [
|
|
{"image_name": t.image_name, "sha256": t.sha256,
|
|
"build_script": t.build_script}
|
|
for t in self.targets
|
|
],
|
|
}
|
|
|
|
|
|
def load_canonical(repo_root: Path | str) -> Manifest:
|
|
"""Load + validate `<repo_root>/manifest.toml`. There is no
|
|
`manifest_path` parameter on purpose — per §4.1 the canonical
|
|
manifest lives at exactly one path. Callers that pass the path
|
|
directly are off the runbook.
|
|
|
|
Raises ManifestError on any failure. Successful return guarantees
|
|
every field is present, every collector name is known, every
|
|
phase name is known, and every catalog entry has both
|
|
verified_against and last_verified.
|
|
"""
|
|
repo_root = Path(repo_root).resolve()
|
|
path = repo_root / CANONICAL_FILENAME
|
|
if not path.exists():
|
|
raise ManifestError(
|
|
f"canonical manifest not found at {path}. "
|
|
f"PIPELINE.md §4.1 requires exactly one manifest at the "
|
|
f"repo root; this host cannot run the experiment without it."
|
|
)
|
|
try:
|
|
raw = tomllib.loads(path.read_text())
|
|
except (OSError, tomllib.TOMLDecodeError) as e:
|
|
raise ManifestError(f"cannot parse {path}: {e}") from e
|
|
|
|
return _validate(raw, repo_root, path)
|
|
|
|
|
|
def _validate(raw: dict, repo_root: Path, path: Path) -> Manifest:
|
|
schema_version = _require_int(raw, "schema_version")
|
|
if schema_version != 1:
|
|
raise ManifestError(
|
|
f"manifest schema_version={schema_version} not supported; "
|
|
f"this orchestrator handles version 1 only. "
|
|
f"Upgrade orchestrator or downgrade manifest to match."
|
|
)
|
|
name = _require_str(raw, "name")
|
|
|
|
experiment = _require_dict(raw, "experiment")
|
|
ram_per_vm_mib = _require_int(experiment, "ram_per_vm_mib")
|
|
if ram_per_vm_mib <= 0:
|
|
raise ManifestError(
|
|
f"experiment.ram_per_vm_mib must be positive, got {ram_per_vm_mib}"
|
|
)
|
|
|
|
schedule_block = _require_dict(experiment, "schedule")
|
|
phases_raw = schedule_block.get("phases")
|
|
if not isinstance(phases_raw, list) or not phases_raw:
|
|
raise ManifestError(
|
|
"experiment.schedule.phases must be a non-empty array"
|
|
)
|
|
phases: list[Phase] = []
|
|
for i, p in enumerate(phases_raw):
|
|
if not isinstance(p, dict):
|
|
raise ManifestError(
|
|
f"experiment.schedule.phases[{i}] must be a table"
|
|
)
|
|
pname = _require_str(p, "name", ctx=f"phases[{i}]")
|
|
if pname not in KNOWN_PHASES:
|
|
raise ManifestError(
|
|
f"experiment.schedule.phases[{i}].name={pname!r} not in "
|
|
f"KNOWN_PHASES {sorted(KNOWN_PHASES)}"
|
|
)
|
|
secs = _require_float(p, "seconds", ctx=f"phases[{i}]")
|
|
if secs <= 0:
|
|
raise ManifestError(
|
|
f"experiment.schedule.phases[{i}].seconds must be > 0, "
|
|
f"got {secs}"
|
|
)
|
|
phases.append(Phase(name=pname, seconds=secs))
|
|
|
|
fleet_block = _require_dict(experiment, "fleet")
|
|
fleet = FleetPolicy(
|
|
max_concurrent_ceiling=_require_int(fleet_block, "max_concurrent_ceiling"),
|
|
max_tier3_slots=_require_int(fleet_block, "max_tier3_slots"),
|
|
)
|
|
if fleet.max_concurrent_ceiling < 0 or fleet.max_tier3_slots < 0:
|
|
raise ManifestError(
|
|
"experiment.fleet ceilings must be >= 0 (0 = no cap)"
|
|
)
|
|
|
|
collectors_block = _require_dict(raw, "collectors")
|
|
active_raw = collectors_block.get("active")
|
|
if not isinstance(active_raw, list):
|
|
raise ManifestError("collectors.active must be an array")
|
|
if len(set(active_raw)) != len(active_raw):
|
|
raise ManifestError(
|
|
f"collectors.active contains duplicates: {active_raw}"
|
|
)
|
|
for c in active_raw:
|
|
if c not in KNOWN_COLLECTORS:
|
|
raise ManifestError(
|
|
f"collectors.active references unknown collector "
|
|
f"{c!r}; known: {sorted(KNOWN_COLLECTORS)}"
|
|
)
|
|
collectors_active = tuple(active_raw)
|
|
|
|
intervals_block = _require_dict(collectors_block, "intervals")
|
|
intervals = CollectorIntervals(
|
|
proc_ms=_require_int(intervals_block, "proc_ms"),
|
|
qmp_ms=_require_int(intervals_block, "qmp_ms"),
|
|
perf_ms=_require_int(intervals_block, "perf_ms"),
|
|
guest_agent_ms=_require_int(intervals_block, "guest_agent_ms"),
|
|
pcap_snaplen=_require_int(intervals_block, "pcap_snaplen"),
|
|
netflow_bucket_ms=_require_int(intervals_block, "netflow_bucket_ms"),
|
|
)
|
|
for fname, fval in (
|
|
("proc_ms", intervals.proc_ms),
|
|
("qmp_ms", intervals.qmp_ms),
|
|
("perf_ms", intervals.perf_ms),
|
|
("guest_agent_ms", intervals.guest_agent_ms),
|
|
("pcap_snaplen", intervals.pcap_snaplen),
|
|
("netflow_bucket_ms", intervals.netflow_bucket_ms),
|
|
):
|
|
if fval <= 0:
|
|
raise ManifestError(
|
|
f"collectors.intervals.{fname} must be > 0, got {fval}"
|
|
)
|
|
|
|
catalog_block = _require_dict(raw, "catalog")
|
|
modules_raw = catalog_block.get("modules")
|
|
if not isinstance(modules_raw, list):
|
|
raise ManifestError("catalog.modules must be an array")
|
|
catalog: list[CatalogEntry] = []
|
|
for i, entry in enumerate(modules_raw):
|
|
if not isinstance(entry, dict):
|
|
raise ManifestError(f"catalog.modules[{i}] must be a table")
|
|
cname = _require_str(entry, "name", ctx=f"catalog[{i}]")
|
|
verified_against = _require_str(
|
|
entry, "verified_against", ctx=f"catalog[{i}]"
|
|
)
|
|
last_verified = _require_str(
|
|
entry, "last_verified", ctx=f"catalog[{i}]"
|
|
)
|
|
catalog.append(CatalogEntry(
|
|
name=cname,
|
|
verified_against=verified_against,
|
|
last_verified=last_verified,
|
|
))
|
|
|
|
targets_block = _require_dict(raw, "targets")
|
|
images_raw = targets_block.get("images")
|
|
if not isinstance(images_raw, list):
|
|
raise ManifestError("targets.images must be an array")
|
|
targets: list[TargetSpec] = []
|
|
for i, t in enumerate(images_raw):
|
|
if not isinstance(t, dict):
|
|
raise ManifestError(f"targets.images[{i}] must be a table")
|
|
targets.append(TargetSpec(
|
|
image_name=_require_str(t, "image_name", ctx=f"targets[{i}]"),
|
|
sha256=_require_str(t, "sha256", ctx=f"targets[{i}]"),
|
|
build_script=_require_str(t, "build_script", ctx=f"targets[{i}]"),
|
|
))
|
|
|
|
samples_block = _require_dict(raw, "samples")
|
|
samples_manifest_path = _require_str(samples_block, "manifest_path")
|
|
|
|
return Manifest(
|
|
schema_version=schema_version,
|
|
name=name,
|
|
ram_per_vm_mib=ram_per_vm_mib,
|
|
schedule=tuple(phases),
|
|
fleet=fleet,
|
|
collectors_active=collectors_active,
|
|
intervals=intervals,
|
|
catalog=tuple(catalog),
|
|
targets=tuple(targets),
|
|
samples_manifest_path=samples_manifest_path,
|
|
repo_root=repo_root,
|
|
manifest_path=path,
|
|
)
|
|
|
|
|
|
# ---------- helpers --------------------------------------------------
|
|
|
|
|
|
def _require(d: dict, key: str, kind: type, *, ctx: str = "") -> object:
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise ManifestError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if not isinstance(v, kind):
|
|
raise ManifestError(
|
|
f"field {where}{key} must be {kind.__name__}, got {type(v).__name__}"
|
|
)
|
|
return v
|
|
|
|
|
|
def _require_str(d: dict, key: str, *, ctx: str = "") -> str:
|
|
return _require(d, key, str, ctx=ctx) # type: ignore[return-value]
|
|
|
|
|
|
def _require_int(d: dict, key: str, *, ctx: str = "") -> int:
|
|
# tomllib parses both ints and floats; require strict int for fields
|
|
# that should be ints, accept int-valued floats for ergonomics.
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise ManifestError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if isinstance(v, bool): # bool is a subclass of int — reject explicitly
|
|
raise ManifestError(f"field {where}{key} must be int, got bool")
|
|
if isinstance(v, int):
|
|
return v
|
|
raise ManifestError(
|
|
f"field {where}{key} must be int, got {type(v).__name__}"
|
|
)
|
|
|
|
|
|
def _require_float(d: dict, key: str, *, ctx: str = "") -> float:
|
|
where = f"{ctx}." if ctx else ""
|
|
if key not in d:
|
|
raise ManifestError(f"missing required field {where}{key}")
|
|
v = d[key]
|
|
if isinstance(v, bool):
|
|
raise ManifestError(f"field {where}{key} must be number, got bool")
|
|
if isinstance(v, (int, float)):
|
|
return float(v)
|
|
raise ManifestError(
|
|
f"field {where}{key} must be number, got {type(v).__name__}"
|
|
)
|
|
|
|
|
|
def _require_dict(d: dict, key: str, *, ctx: str = "") -> dict:
|
|
return _require(d, key, dict, ctx=ctx) # type: ignore[return-value]
|