PIPELINE §5 step 6: event-driven labeller (§4.5)

Phase labels are written ONLY when justifying events arrive. The
schedule clock is now a budget — an upper bound — never a label
source. This is the core honesty fix the §3 evidence demanded:

  Before: every Tier-3 episode wrote `infected_running` from the
          schedule clock regardless of whether session_open ever
          fired. Per §10 every dishonest label is a poisoned
          training example. 67/67 of the §3 probe episodes were
          poisoned this way.

  After:  `infecting` writes ONLY when exploit_fire is observed in
          events.jsonl. `infected_running` writes ONLY when
          session_open is observed. Either timing out or seeing
          session_open_timeout terminates the walker with a `failed`
          label that the §4.6 acceptance gate will reject.

PHASE_JUSTIFYING_EVENTS in orchestrator/episode.py declares which
events justify which phases:

    "clean":            None              # orchestrator-emitted
    "armed":            None              # orchestrator-emitted
    "infecting":        ("exploit_fire",)
    "infected_running": ("session_open",)

TERMINAL_FAILURE_EVENTS = {"session_open_timeout"} short-circuit any
event-driven wait into a `failed` label.

`dormant` is intentionally OFF the canonical schedule. §4.5 calls
for dormant to be event-driven (session_idle / session_active) too,
but the driver doesn't emit those yet. Per §1 default-to-removal we
ship without dormant rather than label it from the clock; when the
driver gains those emits, dormant re-enters the schedule with
proper justification.

EpisodeRunner now owns:
  * `_event_log` — every emit_event appends here
  * `_event_cv`  — condition variable for waiters
  * `_wait_for_event(names, since_t_mono_ns, timeout_s)` — returns
                                  the first matching event in the log
                                  with t_mono >= threshold; threshold
                                  catches events that fired during
                                  the previous on_phase callback.

When an event-driven phase's justifier already arrived (e.g.
exploit_fire emitted by driver._fire() inside on_phase("armed")),
the walker uses the EVENT's t_mono on the label — not the time the
walker noticed. The label means "this is when this thing actually
happened."

manifest.toml: dropped the dormant cycle from the canonical schedule.
Episode is shorter (~30s) but every label is event-justified.

14 new tests in tests/test_event_driven_labeller.py covering: justifier
mapping invariants, _wait_for_event semantics (already-arrived,
future, timeout, since-threshold, first-of-multiple-names), walker
behavior (orchestrator-emitted phases, event-driven phases, missing
event → failed, terminal-failure-event short-circuit, stop event,
event-t_mono on label, phase_transition events with justified_by).

286 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Max Gorog 2026-05-04 01:43:16 -05:00
parent 0d51b9b253
commit d9f913fc97
3 changed files with 464 additions and 22 deletions

View file

@ -34,7 +34,15 @@ ram_per_vm_mib = 320
# orchestrator will wait in that phase. Phase TRANSITIONS are written # orchestrator will wait in that phase. Phase TRANSITIONS are written
# from observed events (PIPELINE.md §4.5), not from this clock. The # from observed events (PIPELINE.md §4.5), not from this clock. The
# clock is just an upper bound: if no event fires within the budget, # clock is just an upper bound: if no event fires within the budget,
# the orchestrator advances and records the failure. # the orchestrator emits a `failed` label and the §4.6 acceptance
# gate rejects the episode.
#
# The dormant cycle (dormant → infected_running → dormant) is NOT
# in the canonical schedule yet because the driver doesn't emit
# session_idle / session_active events. Per §1 default-to-removal,
# we ship without dormant rather than label it from the clock; when
# the driver gains those emits, they re-enter §4.5 with proper
# justification.
[[experiment.schedule.phases]] [[experiment.schedule.phases]]
name = "clean" name = "clean"
seconds = 10.0 seconds = 10.0
@ -51,18 +59,6 @@ seconds = 5.0
name = "infected_running" name = "infected_running"
seconds = 25.0 seconds = 25.0
[[experiment.schedule.phases]]
name = "dormant"
seconds = 15.0
[[experiment.schedule.phases]]
name = "infected_running"
seconds = 20.0
[[experiment.schedule.phases]]
name = "dormant"
seconds = 5.0
[[experiment.schedule.phases]] [[experiment.schedule.phases]]
name = "clean" name = "clean"
seconds = 5.0 seconds = 5.0

View file

@ -120,6 +120,35 @@ log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1 SCHEMA_VERSION = 1
# Event-driven labeller (PIPELINE.md §4.5). For each phase the
# orchestrator may walk through, this dict says what justifies that
# label. None means orchestrator-emitted (clean / armed are decided
# by the orchestrator's own state — episode start and "instructing
# the driver to fire"). Anything else: the named event(s) MUST be
# observed in the event log within the phase's time budget for the
# label to be written. Otherwise the episode terminates with `failed`.
#
# Keep in sync with orchestrator.manifest.KNOWN_PHASES.
PHASE_JUSTIFYING_EVENTS: dict[str, tuple[str, ...] | None] = {
"clean": None, # episode start; orchestrator-emitted
"armed": None, # driver instructed to fire; orchestrator-emitted
"infecting": ("exploit_fire",),
"infected_running": ("session_open",),
# `dormant` is event-driven too per §4.5 (session_idle / activity)
# — but the driver doesn't emit those events yet. Until it does,
# `dormant` is not in the canonical schedule. When session_idle
# / session_active land, this gets its justifier and dormant
# cycles re-enter the canonical schedule.
}
# Terminal failure events — observing any of these during the wait for
# a justifier short-circuits to a `failed` label, terminating the
# walker. session_open_timeout is the §3 evidence pattern: driver
# gave up before the framework could land a session.
TERMINAL_FAILURE_EVENTS: frozenset[str] = frozenset({
"session_open_timeout",
})
PhaseSchedule = list[tuple[str, float]] PhaseSchedule = list[tuple[str, float]]
OnPhase = Callable[[str], None] OnPhase = Callable[[str], None]
@ -209,6 +238,13 @@ class EpisodeRunner:
self.episode_dir.mkdir(parents=True, exist_ok=True) self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns: int = 0 self._t_mono_origin_ns: int = 0
self._stop = threading.Event() self._stop = threading.Event()
# Event log + condition variable underpin the event-driven
# labeller (PIPELINE.md §4.5). Every emit_event call appends a
# row here in addition to writing it to events.jsonl, and
# notifies anyone in _wait_for_event. The walker uses this to
# advance phases on observed events instead of clock ticks.
self._event_log: list[dict] = []
self._event_cv = threading.Condition()
# ---- public --------------------------------------------------------- # ---- public ---------------------------------------------------------
@ -423,22 +459,117 @@ class EpisodeRunner:
# ---- internals ------------------------------------------------------ # ---- internals ------------------------------------------------------
def _walk_schedule(self) -> list[str]: def _walk_schedule(self) -> list[str]:
"""Event-driven phase walker (PIPELINE.md §4.5).
For each (phase, max_wait) entry in the schedule:
* Orchestrator-emitted phases (clean / armed) emit label
immediately, invoke on_phase, hold for max_wait.
* Event-driven phases (infecting / infected_running)
invoke on_phase first (driver may fire / start polling),
then wait for the justifying event up to max_wait. If the
event arrives, label is written using the EVENT's t_mono
(not the wait's, so the label reflects when the thing
actually happened). If a terminal-failure event arrives
instead (e.g. session_open_timeout), or max_wait elapses
with no event, write `failed` and break episode is
rejected by the §4.6 acceptance gate.
Schedule clock is the budget (upper bound), never a label
source. A phase label means the corresponding event was
observed; nothing else. §10 ground truth, §1 honest dataset.
"""
observed: list[str] = [] observed: list[str] = []
prev: str | None = None prev: str | None = None
for phase, dur in self.cfg.phase_schedule or []: # Floor for "valid since": initially episode start. Updated to
# each settled-phase's start so an event during phase N
# justifies phase N+1 (e.g. exploit_fire during armed
# justifies infecting).
since_t_mono_ns = 0
for target_phase, max_wait in self.cfg.phase_schedule or []:
if self._stop.is_set(): if self._stop.is_set():
break break
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
self._emit_label(t_mono, phase, prev=prev, reason="scheduled") phase_entered_ns = time.monotonic_ns() - self._t_mono_origin_ns
self.emit_event("phase_transition", to=phase, prev=prev) justifiers = PHASE_JUSTIFYING_EVENTS.get(target_phase)
if justifiers is None:
# Orchestrator-emitted phase — label NOW.
self._emit_label(
phase_entered_ns, target_phase, prev=prev,
reason="orchestrator_emitted",
)
self.emit_event(
"phase_transition", to=target_phase, prev=prev,
)
if self.on_phase is not None:
try:
self.on_phase(target_phase)
except Exception:
log.exception("on_phase callback raised; continuing")
observed.append(target_phase)
prev = target_phase
since_t_mono_ns = phase_entered_ns
self._stop.wait(timeout=max_wait)
continue
# Event-driven phase. Invoke on_phase FIRST so any driver
# action that emits the justifying event happens within the
# wait window — though events from the previous phase also
# qualify (since_t_mono_ns floor).
if self.on_phase is not None: if self.on_phase is not None:
try: try:
self.on_phase(phase) self.on_phase(target_phase)
except Exception: except Exception:
log.exception("on_phase callback raised; continuing") log.exception("on_phase callback raised; continuing")
observed.append(phase)
prev = phase wait_names = tuple(justifiers) + tuple(TERMINAL_FAILURE_EVENTS)
self._stop.wait(timeout=dur) ev = self._wait_for_event(
wait_names,
since_t_mono_ns=since_t_mono_ns,
timeout_s=max_wait,
)
if ev is None or ev["event"] in TERMINAL_FAILURE_EVENTS:
# Either timeout or terminal-failure event observed.
# Either way: episode failed. Emit `failed` label with
# the actual reason so the acceptance gate / dataset
# consumer can distinguish.
fail_t_mono = time.monotonic_ns() - self._t_mono_origin_ns
if ev is None:
reason = (
f"timeout_no_{'/'.join(justifiers)}_within_"
f"{max_wait}s"
)
else:
reason = f"terminal_event:{ev['event']}"
self._emit_label(
fail_t_mono, "failed", prev=prev, reason=reason,
)
self.emit_event(
"phase_failed",
expected_phase=target_phase,
expected_events=list(justifiers),
reason=reason,
)
observed.append("failed")
break
# Justifying event observed — emit label using EVENT's
# t_mono so the label reflects when the thing actually
# happened, not when the walker noticed.
self._emit_label(
ev["t_mono_ns"], target_phase, prev=prev,
reason=f"event:{ev['event']}",
)
self.emit_event(
"phase_transition", to=target_phase, prev=prev,
justified_by=ev["event"], event_t_mono_ns=ev["t_mono_ns"],
)
observed.append(target_phase)
prev = target_phase
since_t_mono_ns = phase_entered_ns
return observed return observed
def _initial_meta(self, started_at_wall: str) -> dict: def _initial_meta(self, started_at_wall: str) -> dict:
@ -493,7 +624,12 @@ class EpisodeRunner:
def emit_event(self, event: str, **extra) -> None: def emit_event(self, event: str, **extra) -> None:
"""Append a row to events.jsonl. Public so external drivers """Append a row to events.jsonl. Public so external drivers
(e.g. the MSF exploit driver) can stamp their own events with (e.g. the MSF exploit driver) can stamp their own events with
the same monotonic clock the orchestrator is using.""" the same monotonic clock the orchestrator is using.
Also pushes the row into the in-memory event log + notifies
any in-flight `_wait_for_event` callers, so the §4.5
event-driven labeller can advance on observed events.
"""
t_mono_ns = ( t_mono_ns = (
time.monotonic_ns() - self._t_mono_origin_ns time.monotonic_ns() - self._t_mono_origin_ns
if self._t_mono_origin_ns if self._t_mono_origin_ns
@ -507,6 +643,38 @@ class EpisodeRunner:
} }
with (self.episode_dir / "events.jsonl").open("a") as f: with (self.episode_dir / "events.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n") f.write(json.dumps(row, sort_keys=True) + "\n")
with self._event_cv:
self._event_log.append(row)
self._event_cv.notify_all()
def _wait_for_event(
self,
names: tuple[str, ...] | str,
*,
since_t_mono_ns: int,
timeout_s: float,
) -> dict | None:
"""Block until an event in `names` arrives with `t_mono_ns >=
since_t_mono_ns`, or until `timeout_s` elapses. Returns the
matched row or None on timeout.
The threshold catches events that already fired (driver-side
emits during the previous `on_phase` callback) those are
valid justifiers for the current phase, even though the wait
only started now.
"""
if isinstance(names, str):
names = (names,)
deadline = time.monotonic() + timeout_s
with self._event_cv:
while True:
for ev in self._event_log:
if ev["event"] in names and ev["t_mono_ns"] >= since_t_mono_ns:
return ev
remaining = deadline - time.monotonic()
if remaining <= 0:
return None
self._event_cv.wait(timeout=remaining)
def _emit_label( def _emit_label(
self, t_mono_ns: int, phase: str, prev: str | None, reason: str self, t_mono_ns: int, phase: str, prev: str | None, reason: str

View file

@ -0,0 +1,278 @@
"""Tests for the §4.5 event-driven labeller in orchestrator/episode.py.
The walker labels each phase ONLY when a justifying event is observed.
Schedule clock is a budget, never a label source. Failures (timeout
or terminal-failure event) emit `failed` and break the walker."""
from __future__ import annotations
import json
import os
import threading
import time
from pathlib import Path
import pytest
from orchestrator.episode import (
PHASE_JUSTIFYING_EVENTS, EpisodeConfig, EpisodeRunner,
TERMINAL_FAILURE_EVENTS,
)
def _make_runner(tmp_path: Path, *, schedule, target_pid=None) -> EpisodeRunner:
"""Build a runner that won't actually walk collectors — we only
care about the labeller. EpisodeConfig.target_pid points at the
test process so _pid_alive returns True."""
cfg = EpisodeConfig(
target_pid=target_pid or os.getpid(),
duration_s=sum(d for _, d in schedule),
data_root=tmp_path,
phase_schedule=schedule,
interval_ms=100,
)
runner = EpisodeRunner(cfg)
runner._t_mono_origin_ns = time.monotonic_ns()
return runner
def _read_labels(runner: EpisodeRunner) -> list[dict]:
p = runner.episode_dir / "labels.jsonl"
if not p.exists():
return []
return [json.loads(l) for l in p.read_text().splitlines() if l.strip()]
# ---------------------------------------------------------------------
# Justifier mapping invariants
# ---------------------------------------------------------------------
def test_canonical_phase_justifiers_present() -> None:
"""The canonical event-driven phases (infecting, infected_running)
must have justifying events declared. clean / armed are
orchestrator-emitted (None)."""
assert PHASE_JUSTIFYING_EVENTS["clean"] is None
assert PHASE_JUSTIFYING_EVENTS["armed"] is None
assert "exploit_fire" in PHASE_JUSTIFYING_EVENTS["infecting"]
assert "session_open" in PHASE_JUSTIFYING_EVENTS["infected_running"]
def test_session_open_timeout_is_terminal() -> None:
assert "session_open_timeout" in TERMINAL_FAILURE_EVENTS
# ---------------------------------------------------------------------
# wait_for_event — picks up already-arrived + future events
# ---------------------------------------------------------------------
def test_wait_for_event_returns_already_arrived(tmp_path) -> None:
runner = _make_runner(tmp_path, schedule=[])
# Emit BEFORE waiting; the wait should return immediately.
runner.emit_event("test_event", marker=1)
ev = runner._wait_for_event("test_event", since_t_mono_ns=0, timeout_s=0.1)
assert ev is not None
assert ev["event"] == "test_event"
assert ev["marker"] == 1
def test_wait_for_event_picks_up_future(tmp_path) -> None:
runner = _make_runner(tmp_path, schedule=[])
fired = threading.Event()
def fire_after_delay():
time.sleep(0.05)
runner.emit_event("late_event", from_thread=True)
fired.set()
threading.Thread(target=fire_after_delay, daemon=True).start()
ev = runner._wait_for_event("late_event", since_t_mono_ns=0, timeout_s=2.0)
fired.wait(timeout=2.0)
assert ev is not None
assert ev["from_thread"] is True
def test_wait_for_event_times_out(tmp_path) -> None:
runner = _make_runner(tmp_path, schedule=[])
t0 = time.monotonic()
ev = runner._wait_for_event("never_fires", since_t_mono_ns=0, timeout_s=0.2)
elapsed = time.monotonic() - t0
assert ev is None
assert 0.15 < elapsed < 1.0 # roughly the timeout, not minutes
def test_wait_for_event_filters_by_since_threshold(tmp_path) -> None:
runner = _make_runner(tmp_path, schedule=[])
runner.emit_event("e", v=1)
# Emit puts t_mono_ns >= 0 in the row. since=10**18 (way in the
# future) means no event qualifies.
ev = runner._wait_for_event(
"e", since_t_mono_ns=10**18, timeout_s=0.1,
)
assert ev is None
def test_wait_for_event_accepts_first_of_multiple_names(tmp_path) -> None:
runner = _make_runner(tmp_path, schedule=[])
runner.emit_event("a")
runner.emit_event("b")
ev = runner._wait_for_event(("b", "a"), since_t_mono_ns=0, timeout_s=0.1)
assert ev is not None
# First chronologically among the matches; we emitted `a` first.
assert ev["event"] == "a"
# ---------------------------------------------------------------------
# Walker behavior
# ---------------------------------------------------------------------
def test_walker_emits_orchestrator_phases_immediately(tmp_path) -> None:
"""clean and armed are orchestrator-emitted; their labels write
on phase entry, not on event arrival."""
runner = _make_runner(tmp_path, schedule=[
("clean", 0.05),
("armed", 0.05),
])
observed = runner._walk_schedule()
assert observed == ["clean", "armed"]
labels = _read_labels(runner)
assert [l["phase"] for l in labels] == ["clean", "armed"]
assert all(l["reason"] == "orchestrator_emitted" for l in labels)
def test_walker_writes_infecting_only_on_exploit_fire(tmp_path) -> None:
"""infecting label must only appear when exploit_fire fires.
Without the event, walker emits failed."""
runner = _make_runner(tmp_path, schedule=[
("clean", 0.05),
("armed", 0.05),
("infecting", 0.1),
])
observed = runner._walk_schedule()
assert observed == ["clean", "armed", "failed"]
labels = _read_labels(runner)
phase_names = [l["phase"] for l in labels]
assert "infecting" not in phase_names
assert phase_names[-1] == "failed"
fail_label = labels[-1]
assert "timeout_no_exploit_fire" in fail_label["reason"]
def test_walker_writes_infecting_when_event_arrives(tmp_path) -> None:
"""When exploit_fire is emitted (during armed or infecting), the
infecting label is written using the EVENT's t_mono."""
schedule = [
("clean", 0.05),
("armed", 0.05),
("infecting", 0.5),
("infected_running", 0.5),
]
runner = _make_runner(tmp_path, schedule=schedule)
# Driver-side simulator: when armed phase begins, fire exploit;
# when infecting begins, "open the session".
def on_phase(p):
if p == "armed":
runner.emit_event("exploit_fire")
elif p == "infecting":
runner.emit_event("session_open", session_id=1)
runner.on_phase = on_phase
observed = runner._walk_schedule()
assert observed == ["clean", "armed", "infecting", "infected_running"]
labels = _read_labels(runner)
phase_names = [l["phase"] for l in labels]
assert phase_names == ["clean", "armed", "infecting", "infected_running"]
# Reasons reflect event justification on the event-driven phases.
assert labels[2]["reason"] == "event:exploit_fire"
assert labels[3]["reason"] == "event:session_open"
def test_walker_uses_event_t_mono_not_walker_t_mono(tmp_path) -> None:
"""When exploit_fire fires DURING armed phase (i.e. before the
walker enters infecting), the infecting label must carry the
EVENT's t_mono — not the time the walker noticed."""
schedule = [
("clean", 0.02),
("armed", 0.5), # long armed; event fires early in it
("infecting", 0.5),
]
runner = _make_runner(tmp_path, schedule=schedule)
fired_at = [None]
def on_phase(p):
if p == "armed":
fired_at[0] = time.monotonic_ns() - runner._t_mono_origin_ns
runner.emit_event("exploit_fire")
runner.on_phase = on_phase
runner._walk_schedule()
labels = _read_labels(runner)
infecting = next(l for l in labels if l["phase"] == "infecting")
# The infecting label's t_mono should be very close to fired_at,
# not to when the walker entered infecting (which is much later).
assert abs(infecting["t_mono_ns"] - fired_at[0]) < 5_000_000 # 5ms
def test_walker_terminal_failure_event_short_circuits(tmp_path) -> None:
"""If session_open_timeout arrives during the wait for
session_open, walker must emit `failed` (not infected_running)
and stop walking."""
schedule = [
("clean", 0.02),
("armed", 0.02),
("infecting", 0.5),
("infected_running", 0.5),
]
runner = _make_runner(tmp_path, schedule=schedule)
def on_phase(p):
if p == "armed":
runner.emit_event("exploit_fire")
elif p == "infected_running":
runner.emit_event("session_open_timeout", timeout_s=30)
runner.on_phase = on_phase
observed = runner._walk_schedule()
assert observed == ["clean", "armed", "infecting", "failed"]
labels = _read_labels(runner)
fail_label = labels[-1]
assert fail_label["phase"] == "failed"
assert "session_open_timeout" in fail_label["reason"]
def test_walker_stop_event_breaks_walk(tmp_path) -> None:
schedule = [("clean", 0.5), ("armed", 0.5), ("infecting", 0.5)]
runner = _make_runner(tmp_path, schedule=schedule)
runner._stop.set()
observed = runner._walk_schedule()
assert observed == []
def test_walker_writes_phase_transition_events(tmp_path) -> None:
"""Each phase transition (orchestrator-emitted or event-driven)
must also fire a phase_transition event into events.jsonl. The
event-driven version carries `justified_by` so the §4.6 gate can
cross-check."""
schedule = [("clean", 0.02), ("armed", 0.02), ("infecting", 0.5)]
runner = _make_runner(tmp_path, schedule=schedule)
def on_phase(p):
if p == "armed":
runner.emit_event("exploit_fire")
runner.on_phase = on_phase
runner._walk_schedule()
events_path = runner.episode_dir / "events.jsonl"
events = [json.loads(l) for l in events_path.read_text().splitlines() if l.strip()]
transitions = [e for e in events if e["event"] == "phase_transition"]
transition_phases = [e["to"] for e in transitions]
assert transition_phases == ["clean", "armed", "infecting"]
# Event-driven phase carries justified_by; orchestrator-emitted
# phases don't.
infecting_t = next(e for e in transitions if e["to"] == "infecting")
assert infecting_t["justified_by"] == "exploit_fire"
armed_t = next(e for e in transitions if e["to"] == "armed")
assert "justified_by" not in armed_t