"""Tests for the §4.5 event-driven labeller in orchestrator/episode.py. The walker labels each phase ONLY when a justifying event is observed. Schedule clock is a budget, never a label source. Failures (timeout or terminal-failure event) emit `failed` and break the walker.""" from __future__ import annotations import json import os import threading import time from pathlib import Path import pytest from orchestrator.episode import ( PHASE_JUSTIFYING_EVENTS, EpisodeConfig, EpisodeRunner, TERMINAL_FAILURE_EVENTS, ) def _make_runner(tmp_path: Path, *, schedule, target_pid=None) -> EpisodeRunner: """Build a runner that won't actually walk collectors — we only care about the labeller. EpisodeConfig.target_pid points at the test process so _pid_alive returns True.""" cfg = EpisodeConfig( target_pid=target_pid or os.getpid(), duration_s=sum(d for _, d in schedule), data_root=tmp_path, phase_schedule=schedule, interval_ms=100, ) runner = EpisodeRunner(cfg) runner._t_mono_origin_ns = time.monotonic_ns() return runner def _read_labels(runner: EpisodeRunner) -> list[dict]: p = runner.episode_dir / "labels.jsonl" if not p.exists(): return [] return [json.loads(l) for l in p.read_text().splitlines() if l.strip()] # --------------------------------------------------------------------- # Justifier mapping invariants # --------------------------------------------------------------------- def test_canonical_phase_justifiers_present() -> None: """The canonical event-driven phases (infecting, infected_running) must have justifying events declared. clean / armed are orchestrator-emitted (None).""" assert PHASE_JUSTIFYING_EVENTS["clean"] is None assert PHASE_JUSTIFYING_EVENTS["armed"] is None assert "exploit_fire" in PHASE_JUSTIFYING_EVENTS["infecting"] assert "session_open" in PHASE_JUSTIFYING_EVENTS["infected_running"] def test_session_open_timeout_is_terminal() -> None: assert "session_open_timeout" in TERMINAL_FAILURE_EVENTS # --------------------------------------------------------------------- # wait_for_event — picks up already-arrived + future events # --------------------------------------------------------------------- def test_wait_for_event_returns_already_arrived(tmp_path) -> None: runner = _make_runner(tmp_path, schedule=[]) # Emit BEFORE waiting; the wait should return immediately. runner.emit_event("test_event", marker=1) ev = runner._wait_for_event("test_event", since_t_mono_ns=0, timeout_s=0.1) assert ev is not None assert ev["event"] == "test_event" assert ev["marker"] == 1 def test_wait_for_event_picks_up_future(tmp_path) -> None: runner = _make_runner(tmp_path, schedule=[]) fired = threading.Event() def fire_after_delay(): time.sleep(0.05) runner.emit_event("late_event", from_thread=True) fired.set() threading.Thread(target=fire_after_delay, daemon=True).start() ev = runner._wait_for_event("late_event", since_t_mono_ns=0, timeout_s=2.0) fired.wait(timeout=2.0) assert ev is not None assert ev["from_thread"] is True def test_wait_for_event_times_out(tmp_path) -> None: runner = _make_runner(tmp_path, schedule=[]) t0 = time.monotonic() ev = runner._wait_for_event("never_fires", since_t_mono_ns=0, timeout_s=0.2) elapsed = time.monotonic() - t0 assert ev is None assert 0.15 < elapsed < 1.0 # roughly the timeout, not minutes def test_wait_for_event_filters_by_since_threshold(tmp_path) -> None: runner = _make_runner(tmp_path, schedule=[]) runner.emit_event("e", v=1) # Emit puts t_mono_ns >= 0 in the row. since=10**18 (way in the # future) means no event qualifies. ev = runner._wait_for_event( "e", since_t_mono_ns=10**18, timeout_s=0.1, ) assert ev is None def test_wait_for_event_accepts_first_of_multiple_names(tmp_path) -> None: runner = _make_runner(tmp_path, schedule=[]) runner.emit_event("a") runner.emit_event("b") ev = runner._wait_for_event(("b", "a"), since_t_mono_ns=0, timeout_s=0.1) assert ev is not None # First chronologically among the matches; we emitted `a` first. assert ev["event"] == "a" # --------------------------------------------------------------------- # Walker behavior # --------------------------------------------------------------------- def test_walker_emits_orchestrator_phases_immediately(tmp_path) -> None: """clean and armed are orchestrator-emitted; their labels write on phase entry, not on event arrival.""" runner = _make_runner(tmp_path, schedule=[ ("clean", 0.05), ("armed", 0.05), ]) observed = runner._walk_schedule() assert observed == ["clean", "armed"] labels = _read_labels(runner) assert [l["phase"] for l in labels] == ["clean", "armed"] assert all(l["reason"] == "orchestrator_emitted" for l in labels) def test_walker_writes_infecting_only_on_exploit_fire(tmp_path) -> None: """infecting label must only appear when exploit_fire fires. Without the event, walker emits failed.""" runner = _make_runner(tmp_path, schedule=[ ("clean", 0.05), ("armed", 0.05), ("infecting", 0.1), ]) observed = runner._walk_schedule() assert observed == ["clean", "armed", "failed"] labels = _read_labels(runner) phase_names = [l["phase"] for l in labels] assert "infecting" not in phase_names assert phase_names[-1] == "failed" fail_label = labels[-1] assert "timeout_no_exploit_fire" in fail_label["reason"] def test_walker_writes_infecting_when_event_arrives(tmp_path) -> None: """When exploit_fire is emitted (during armed or infecting), the infecting label is written using the EVENT's t_mono.""" schedule = [ ("clean", 0.05), ("armed", 0.05), ("infecting", 0.5), ("infected_running", 0.5), ] runner = _make_runner(tmp_path, schedule=schedule) # Driver-side simulator: when armed phase begins, fire exploit; # when infecting begins, "open the session". def on_phase(p): if p == "armed": runner.emit_event("exploit_fire") elif p == "infecting": runner.emit_event("session_open", session_id=1) runner.on_phase = on_phase observed = runner._walk_schedule() assert observed == ["clean", "armed", "infecting", "infected_running"] labels = _read_labels(runner) phase_names = [l["phase"] for l in labels] assert phase_names == ["clean", "armed", "infecting", "infected_running"] # Reasons reflect event justification on the event-driven phases. assert labels[2]["reason"] == "event:exploit_fire" assert labels[3]["reason"] == "event:session_open" def test_walker_uses_event_t_mono_not_walker_t_mono(tmp_path) -> None: """When exploit_fire fires DURING armed phase (i.e. before the walker enters infecting), the infecting label must carry the EVENT's t_mono — not the time the walker noticed.""" schedule = [ ("clean", 0.02), ("armed", 0.5), # long armed; event fires early in it ("infecting", 0.5), ] runner = _make_runner(tmp_path, schedule=schedule) fired_at = [None] def on_phase(p): if p == "armed": fired_at[0] = time.monotonic_ns() - runner._t_mono_origin_ns runner.emit_event("exploit_fire") runner.on_phase = on_phase runner._walk_schedule() labels = _read_labels(runner) infecting = next(l for l in labels if l["phase"] == "infecting") # The infecting label's t_mono should be very close to fired_at, # not to when the walker entered infecting (which is much later). assert abs(infecting["t_mono_ns"] - fired_at[0]) < 5_000_000 # 5ms def test_walker_terminal_failure_event_short_circuits(tmp_path) -> None: """If session_open_timeout arrives during the wait for session_open, walker must emit `failed` (not infected_running) and stop walking.""" schedule = [ ("clean", 0.02), ("armed", 0.02), ("infecting", 0.5), ("infected_running", 0.5), ] runner = _make_runner(tmp_path, schedule=schedule) def on_phase(p): if p == "armed": runner.emit_event("exploit_fire") elif p == "infected_running": runner.emit_event("session_open_timeout", timeout_s=30) runner.on_phase = on_phase observed = runner._walk_schedule() assert observed == ["clean", "armed", "infecting", "failed"] labels = _read_labels(runner) fail_label = labels[-1] assert fail_label["phase"] == "failed" assert "session_open_timeout" in fail_label["reason"] def test_walker_stop_event_breaks_walk(tmp_path) -> None: schedule = [("clean", 0.5), ("armed", 0.5), ("infecting", 0.5)] runner = _make_runner(tmp_path, schedule=schedule) runner._stop.set() observed = runner._walk_schedule() assert observed == [] def test_walker_writes_phase_transition_events(tmp_path) -> None: """Each phase transition (orchestrator-emitted or event-driven) must also fire a phase_transition event into events.jsonl. The event-driven version carries `justified_by` so the §4.6 gate can cross-check.""" schedule = [("clean", 0.02), ("armed", 0.02), ("infecting", 0.5)] runner = _make_runner(tmp_path, schedule=schedule) def on_phase(p): if p == "armed": runner.emit_event("exploit_fire") runner.on_phase = on_phase runner._walk_schedule() events_path = runner.episode_dir / "events.jsonl" events = [json.loads(l) for l in events_path.read_text().splitlines() if l.strip()] transitions = [e for e in events if e["event"] == "phase_transition"] transition_phases = [e["to"] for e in transitions] assert transition_phases == ["clean", "armed", "infecting"] # Event-driven phase carries justified_by; orchestrator-emitted # phases don't. infecting_t = next(e for e in transitions if e["to"] == "infecting") assert infecting_t["justified_by"] == "exploit_fire" armed_t = next(e for e in transitions if e["to"] == "armed") assert "justified_by" not in armed_t