CIS490/orchestrator/episode.py
Max Gorog d9f913fc97 PIPELINE §5 step 6: event-driven labeller (§4.5)
Phase labels are written ONLY when justifying events arrive. The
schedule clock is now a budget — an upper bound — never a label
source. This is the core honesty fix the §3 evidence demanded:

  Before: every Tier-3 episode wrote `infected_running` from the
          schedule clock regardless of whether session_open ever
          fired. Per §10 every dishonest label is a poisoned
          training example. 67/67 of the §3 probe episodes were
          poisoned this way.

  After:  `infecting` writes ONLY when exploit_fire is observed in
          events.jsonl. `infected_running` writes ONLY when
          session_open is observed. Either timing out or seeing
          session_open_timeout terminates the walker with a `failed`
          label that the §4.6 acceptance gate will reject.

PHASE_JUSTIFYING_EVENTS in orchestrator/episode.py declares which
events justify which phases:

    "clean":            None              # orchestrator-emitted
    "armed":            None              # orchestrator-emitted
    "infecting":        ("exploit_fire",)
    "infected_running": ("session_open",)

TERMINAL_FAILURE_EVENTS = {"session_open_timeout"} short-circuit any
event-driven wait into a `failed` label.

`dormant` is intentionally OFF the canonical schedule. §4.5 calls
for dormant to be event-driven (session_idle / session_active) too,
but the driver doesn't emit those yet. Per §1 default-to-removal we
ship without dormant rather than label it from the clock; when the
driver gains those emits, dormant re-enters the schedule with
proper justification.

EpisodeRunner now owns:
  * `_event_log` — every emit_event appends here
  * `_event_cv`  — condition variable for waiters
  * `_wait_for_event(names, since_t_mono_ns, timeout_s)` — returns
                                  the first matching event in the log
                                  with t_mono >= threshold; threshold
                                  catches events that fired during
                                  the previous on_phase callback.

When an event-driven phase's justifier already arrived (e.g.
exploit_fire emitted by driver._fire() inside on_phase("armed")),
the walker uses the EVENT's t_mono on the label — not the time the
walker noticed. The label means "this is when this thing actually
happened."

manifest.toml: dropped the dormant cycle from the canonical schedule.
Episode is shorter (~30s) but every label is event-justified.

14 new tests in tests/test_event_driven_labeller.py covering: justifier
mapping invariants, _wait_for_event semantics (already-arrived,
future, timeout, since-threshold, first-of-multiple-names), walker
behavior (orchestrator-emitted phases, event-driven phases, missing
event → failed, terminal-failure-event short-circuit, stop event,
event-t_mono on label, phase_transition events with justified_by).

286 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:43:16 -05:00

701 lines
28 KiB
Python

"""EpisodeRunner — single-episode driver.
Writes the full per-episode directory shape from ``docs/data-model.md``:
data/episodes/<ulid>/
meta.json
events.jsonl
labels.jsonl
telemetry-proc.jsonl
done.marker
Two modes:
1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window
``clean``. Useful for v0 sanity tests against any pid.
2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of
``(phase, duration_s)`` tuples, emitting one label per transition. An
optional ``on_phase`` callback fires at each transition, used to drive an
external load mimic (or, later, the real exploit/sample driver).
Real VM bring-up will arrive as a third mode that boots a guest, fires an
exploit, and adapts the schedule based on observed events
(``session_open``, ``sample_executed``, ...).
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp
from samples.manifest import Sample
from .ulid import new_ulid
# Repo root for the version probe — orchestrator/episode.py lives at
# <repo>/orchestrator/episode.py.
_REPO_ROOT = Path(__file__).resolve().parent.parent
# Cached so we don't fork `git` on every episode.
_CODE_VERSION_CACHE: dict | None = None
def _resolve_code_version() -> dict:
"""Return a small dict identifying the code that produced this episode.
Order of resolution:
1. ``$INSTALL_ROOT/VERSION`` (written by install-lab-host.sh at
install time — typical production path, since /opt/cis490
doesn't carry a .git/ dir)
2. ``git rev-parse HEAD`` from the repo root (dev clones)
3. ``{"commit": "unknown"}`` so meta.json always has the field
Output shape (always present):
{"commit": "<40-hex>" | "unknown",
"branch": "<name>" | None,
"dirty": bool | None,
"source": "VERSION-file" | "git" | "unknown"}
Result is cached at module level so per-episode meta emission is
free after the first read."""
global _CODE_VERSION_CACHE
if _CODE_VERSION_CACHE is not None:
return _CODE_VERSION_CACHE
# 1. VERSION file (production install).
for cand in (_REPO_ROOT / "VERSION", Path("/opt/cis490/VERSION")):
if cand.is_file():
try:
v = json.loads(cand.read_text())
if isinstance(v, dict) and v.get("commit"):
v.setdefault("source", "VERSION-file")
_CODE_VERSION_CACHE = v
return v
except (json.JSONDecodeError, OSError):
pass
# 2. git rev-parse from repo root (dev clones).
try:
commit = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "HEAD"],
capture_output=True, text=True, timeout=2, check=True,
).stdout.strip()
branch = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True, text=True, timeout=2,
).stdout.strip() or None
# `git status --porcelain` is empty iff the working tree is clean.
porcelain = subprocess.run(
["git", "-C", str(_REPO_ROOT), "status", "--porcelain"],
capture_output=True, text=True, timeout=2,
).stdout
_CODE_VERSION_CACHE = {
"commit": commit,
"branch": branch,
"dirty": bool(porcelain.strip()),
"source": "git",
}
return _CODE_VERSION_CACHE
except (subprocess.SubprocessError, FileNotFoundError, OSError):
pass
_CODE_VERSION_CACHE = {
"commit": "unknown", "branch": None, "dirty": None, "source": "unknown",
}
return _CODE_VERSION_CACHE
log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1
# Event-driven labeller (PIPELINE.md §4.5). For each phase the
# orchestrator may walk through, this dict says what justifies that
# label. None means orchestrator-emitted (clean / armed are decided
# by the orchestrator's own state — episode start and "instructing
# the driver to fire"). Anything else: the named event(s) MUST be
# observed in the event log within the phase's time budget for the
# label to be written. Otherwise the episode terminates with `failed`.
#
# Keep in sync with orchestrator.manifest.KNOWN_PHASES.
PHASE_JUSTIFYING_EVENTS: dict[str, tuple[str, ...] | None] = {
"clean": None, # episode start; orchestrator-emitted
"armed": None, # driver instructed to fire; orchestrator-emitted
"infecting": ("exploit_fire",),
"infected_running": ("session_open",),
# `dormant` is event-driven too per §4.5 (session_idle / activity)
# — but the driver doesn't emit those events yet. Until it does,
# `dormant` is not in the canonical schedule. When session_idle
# / session_active land, this gets its justifier and dormant
# cycles re-enter the canonical schedule.
}
# Terminal failure events — observing any of these during the wait for
# a justifier short-circuits to a `failed` label, terminating the
# walker. session_open_timeout is the §3 evidence pattern: driver
# gave up before the framework could land a session.
TERMINAL_FAILURE_EVENTS: frozenset[str] = frozenset({
"session_open_timeout",
})
PhaseSchedule = list[tuple[str, float]]
OnPhase = Callable[[str], None]
@dataclass
class EpisodeConfig:
target_pid: int
duration_s: float
data_root: Path
interval_ms: int = 100
image_name: str = "(stub)"
snapshot_name: str = "(none-stub)"
episode_id: str | None = None
# When set, walk this schedule and ignore duration_s for sleep timing.
# ``duration_s`` still goes in meta.schedule for record-keeping.
phase_schedule: PhaseSchedule | None = None
# Optional: paths to QEMU sockets exposed by the launcher. When
# set, EpisodeRunner spins up additional collector threads.
qmp_socket: Path | None = None
qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads
guest_agent_socket: Path | None = None
# Optional: bridge interface to capture per-episode pcap on. When
# set, EpisodeRunner spawns tcpdump for the duration of the
# schedule and bucketizes the result into netflow.jsonl on stop.
bridge_iface: str | None = None
bridge_ip: str = "10.200.0.1"
pcap_snaplen: int = 256
# Source 3: perf stat sampling. Disabled by default because perf
# needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable
# explicitly per-episode when the host supports it.
enable_perf: bool = False
perf_interval_ms: int = 100
# The Sample that drove this episode's workload selection. Stamped
# into meta.json so trainers can join episodes by family / kind
# without re-deriving from events. None = v1 yes-loop fallback.
sample: Sample | None = None
# The exploit module that fired (Tier 3+). Plain dict so the runner
# doesn't need to import exploits.modules; populated by callers
# that have a ModuleConfig in hand.
exploit_meta: dict | None = None
# Canonical experiment manifest provenance (PIPELINE.md §4.1).
# Plain dict (output of Manifest.to_meta()) so the runner doesn't
# need to import orchestrator.manifest; populated by run_fleet /
# run_*_demo via load_canonical(). When None, meta.experiment is
# null — useful for unit tests that don't go through the canonical
# path; production episodes always carry it.
experiment_meta: dict | None = None
# Snapshot/revert (Tier 0+):
# revert_at_start — before any phase walks, loadvm <snapshot_name>.
# Use this to drop the guest back to a known-good baseline at
# the start of every episode in a long-lived-VM fleet loop.
# revert_at_end — after the schedule walks, loadvm <snapshot_name>
# so the next consumer of this VM starts clean too.
revert_at_start: bool = False
revert_at_end: bool = False
@dataclass
class EpisodeResult:
episode_id: str
episode_dir: Path
rows_proc: int
rows_qmp: int = 0
rows_guest: int = 0
rows_netflow: int = 0
rows_perf: int = 0
pcap_bytes: int = 0
pid_disappeared: bool = False
duration_observed_s: float = 0.0
phases_observed: list[str] = field(default_factory=list)
class EpisodeRunner:
def __init__(
self,
cfg: EpisodeConfig,
on_phase: OnPhase | None = None,
) -> None:
self.cfg = cfg
self.on_phase = on_phase
self.episode_id = cfg.episode_id or new_ulid()
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
# Create the dir up front so external drivers can call
# emit_event() between construction and run() — e.g. an exploit
# driver that writes a driver_setup event before the schedule
# walks. The dir is otherwise empty until run() opens files.
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns: int = 0
self._stop = threading.Event()
# Event log + condition variable underpin the event-driven
# labeller (PIPELINE.md §4.5). Every emit_event call appends a
# row here in addition to writing it to events.jsonl, and
# notifies anyone in _wait_for_event. The walker uses this to
# advance phases on observed events instead of clock ticks.
self._event_log: list[dict] = []
self._event_cv = threading.Condition()
# ---- public ---------------------------------------------------------
def run(self) -> EpisodeResult:
self._t_mono_origin_ns = time.monotonic_ns()
# snapshot_load is the marker for "episode clock = 0". Emit
# BEFORE any file I/O — _write_meta() takes >1 ms on slow disks
# (Refs spectral/CIS490#7).
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
started_at_wall = datetime.now(timezone.utc).isoformat()
meta = self._initial_meta(started_at_wall)
self._write_meta(meta)
# Snapshot revert at start: pause+restore the guest to a known
# baseline before phase 0. Requires QMP and a savevm having
# already taken place (the launcher is responsible for that).
if self.cfg.revert_at_start and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="start",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at start failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="start",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0}
pcap_handle: pcap.CaptureHandle | None = None
pcap_path = self.episode_dir / "network.pcap"
netflow_path = self.episode_dir / "netflow.jsonl"
if self.cfg.bridge_iface:
try:
pcap_handle = pcap.run_capture(
bridge=self.cfg.bridge_iface,
pcap_path=pcap_path,
snaplen=self.cfg.pcap_snaplen,
)
self.emit_event("pcap_started", iface=self.cfg.bridge_iface)
except (OSError, FileNotFoundError) as e:
log.warning("pcap capture not available on %s: %s",
self.cfg.bridge_iface, e)
self.emit_event("pcap_unavailable",
iface=self.cfg.bridge_iface, error=str(e))
def _proc_collector() -> None:
rows_holder["proc"] = proc_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-proc.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.interval_ms,
stop_event=self._stop,
)
def _qmp_collector() -> None:
assert self.cfg.qmp_socket is not None
rows_holder["qmp"] = qmp.run_loop(
socket_path=self.cfg.qmp_socket,
output_path=self.episode_dir / "telemetry-qmp.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.qmp_interval_ms,
stop_event=self._stop,
)
def _guest_collector() -> None:
assert self.cfg.guest_agent_socket is not None
rows_holder["guest"] = guest_agent.run_loop(
socket_path=self.cfg.guest_agent_socket,
output_path=self.episode_dir / "telemetry-guest.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
stop_event=self._stop,
# Pipe lifecycle events into the orchestrator's
# events.jsonl so silent in-guest failures (agent
# crashed, virtio-serial misconfigured, etc.) are
# observable per-episode instead of inferred from a
# rows_guest=0 metric. Refs PIPELINE.md §1 / §4.4.
emit_event=self.emit_event,
)
def _perf_collector() -> None:
rows_holder["perf"] = perf_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-perf.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.perf_interval_ms,
stop_event=self._stop,
# Per-episode visibility into perf lifecycle so silent
# failures (binary missing, paranoid level too high,
# all counters <not counted>) surface in events.jsonl
# rather than only as rows_perf=0. Refs §1.
emit_event=self.emit_event,
)
threads: list[threading.Thread] = []
threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu"))
if self.cfg.qmp_socket is not None:
threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp"))
if self.cfg.guest_agent_socket is not None:
threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent"))
if self.cfg.enable_perf:
threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf"))
for t in threads:
t.start()
phases_observed: list[str] = []
try:
if self.cfg.phase_schedule:
phases_observed = self._walk_schedule()
else:
self._emit_label(0, "clean", prev=None, reason="snapshot_loaded")
phases_observed = ["clean"]
self._stop.wait(timeout=self.cfg.duration_s)
finally:
# Optional revert before stopping collectors so the
# transition shows up in their telemetry too — useful for
# building "snapshot revert" as a labeled phase later.
if self.cfg.revert_at_end and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="end",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at end failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="end",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
self._stop.set()
for t in threads:
t.join(timeout=3.0)
if pcap_handle is not None:
rc = pcap.stop_capture(pcap_handle)
self.emit_event("pcap_stopped", rc=rc,
pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0)
rows_holder["netflow"] = pcap.bucketize(
pcap_path, netflow_path,
bucket_ms=100,
t_mono_origin_ns=self._t_mono_origin_ns,
bridge_ip=self.cfg.bridge_ip,
)
pid_alive = _pid_alive(self.cfg.target_pid)
self.emit_event("episode_end", target_pid_alive=pid_alive)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0
meta["result"] = {
"phases_observed": phases_observed,
"rows_proc": rows_holder["proc"],
"rows_qmp": rows_holder["qmp"],
"rows_guest": rows_holder["guest"],
"rows_perf": rows_holder["perf"],
"rows_netflow": rows_holder["netflow"],
"pcap_bytes": pcap_size,
"pid_alive_at_end": pid_alive,
"duration_observed_s": end_mono_ns / 1_000_000_000,
}
self._write_meta(meta)
(self.episode_dir / "done.marker").touch()
log.info(
"episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s",
self.episode_id,
rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"],
rows_holder["perf"], rows_holder["netflow"], pcap_size,
end_mono_ns / 1e9,
phases_observed,
)
return EpisodeResult(
episode_id=self.episode_id,
episode_dir=self.episode_dir,
rows_proc=rows_holder["proc"],
rows_qmp=rows_holder["qmp"],
rows_guest=rows_holder["guest"],
rows_netflow=rows_holder["netflow"],
rows_perf=rows_holder["perf"],
pcap_bytes=pcap_size,
pid_disappeared=not pid_alive,
duration_observed_s=end_mono_ns / 1_000_000_000,
phases_observed=phases_observed,
)
def stop(self) -> None:
self._stop.set()
# ---- internals ------------------------------------------------------
def _walk_schedule(self) -> list[str]:
"""Event-driven phase walker (PIPELINE.md §4.5).
For each (phase, max_wait) entry in the schedule:
* Orchestrator-emitted phases (clean / armed) — emit label
immediately, invoke on_phase, hold for max_wait.
* Event-driven phases (infecting / infected_running) —
invoke on_phase first (driver may fire / start polling),
then wait for the justifying event up to max_wait. If the
event arrives, label is written using the EVENT's t_mono
(not the wait's, so the label reflects when the thing
actually happened). If a terminal-failure event arrives
instead (e.g. session_open_timeout), or max_wait elapses
with no event, write `failed` and break — episode is
rejected by the §4.6 acceptance gate.
Schedule clock is the budget (upper bound), never a label
source. A phase label means the corresponding event was
observed; nothing else. §10 ground truth, §1 honest dataset.
"""
observed: list[str] = []
prev: str | None = None
# Floor for "valid since": initially episode start. Updated to
# each settled-phase's start so an event during phase N
# justifies phase N+1 (e.g. exploit_fire during armed
# justifies infecting).
since_t_mono_ns = 0
for target_phase, max_wait in self.cfg.phase_schedule or []:
if self._stop.is_set():
break
phase_entered_ns = time.monotonic_ns() - self._t_mono_origin_ns
justifiers = PHASE_JUSTIFYING_EVENTS.get(target_phase)
if justifiers is None:
# Orchestrator-emitted phase — label NOW.
self._emit_label(
phase_entered_ns, target_phase, prev=prev,
reason="orchestrator_emitted",
)
self.emit_event(
"phase_transition", to=target_phase, prev=prev,
)
if self.on_phase is not None:
try:
self.on_phase(target_phase)
except Exception:
log.exception("on_phase callback raised; continuing")
observed.append(target_phase)
prev = target_phase
since_t_mono_ns = phase_entered_ns
self._stop.wait(timeout=max_wait)
continue
# Event-driven phase. Invoke on_phase FIRST so any driver
# action that emits the justifying event happens within the
# wait window — though events from the previous phase also
# qualify (since_t_mono_ns floor).
if self.on_phase is not None:
try:
self.on_phase(target_phase)
except Exception:
log.exception("on_phase callback raised; continuing")
wait_names = tuple(justifiers) + tuple(TERMINAL_FAILURE_EVENTS)
ev = self._wait_for_event(
wait_names,
since_t_mono_ns=since_t_mono_ns,
timeout_s=max_wait,
)
if ev is None or ev["event"] in TERMINAL_FAILURE_EVENTS:
# Either timeout or terminal-failure event observed.
# Either way: episode failed. Emit `failed` label with
# the actual reason so the acceptance gate / dataset
# consumer can distinguish.
fail_t_mono = time.monotonic_ns() - self._t_mono_origin_ns
if ev is None:
reason = (
f"timeout_no_{'/'.join(justifiers)}_within_"
f"{max_wait}s"
)
else:
reason = f"terminal_event:{ev['event']}"
self._emit_label(
fail_t_mono, "failed", prev=prev, reason=reason,
)
self.emit_event(
"phase_failed",
expected_phase=target_phase,
expected_events=list(justifiers),
reason=reason,
)
observed.append("failed")
break
# Justifying event observed — emit label using EVENT's
# t_mono so the label reflects when the thing actually
# happened, not when the walker noticed.
self._emit_label(
ev["t_mono_ns"], target_phase, prev=prev,
reason=f"event:{ev['event']}",
)
self.emit_event(
"phase_transition", to=target_phase, prev=prev,
justified_by=ev["event"], event_t_mono_ns=ev["t_mono_ns"],
)
observed.append(target_phase)
prev = target_phase
since_t_mono_ns = phase_entered_ns
return observed
def _initial_meta(self, started_at_wall: str) -> dict:
sample_meta: dict | None = None
if self.cfg.sample is not None:
s = self.cfg.sample
sample_meta = {
"name": s.name,
"family": s.family,
"category": s.category,
"profile": s.profile,
"kind": s.kind,
"sha256": s.sha256,
}
return {
"episode_id": self.episode_id,
"schema_version": SCHEMA_VERSION,
"code_version": _resolve_code_version(),
"started_at_wall": started_at_wall,
"ended_at_wall": None,
"host_fingerprint": {
"kernel": os.uname().release,
"qemu_version": None,
},
"vm": {
"image_name": self.cfg.image_name,
"image_sha256": None,
"snapshot_name": self.cfg.snapshot_name,
"vcpus": None,
"ram_mib": None,
"target_pid": self.cfg.target_pid,
},
"exploit": self.cfg.exploit_meta,
"sample": sample_meta,
"experiment": self.cfg.experiment_meta,
"schedule": {
"baseline_seconds": self.cfg.duration_s,
"interval_ms": self.cfg.interval_ms,
"phase_schedule": self.cfg.phase_schedule,
},
"result": None,
}
def _write_meta(self, meta: dict) -> None:
path = self.episode_dir / "meta.json"
tmp = path.with_suffix(".json.partial")
with tmp.open("w") as f:
json.dump(meta, f, indent=2, sort_keys=True)
f.write("\n")
os.replace(tmp, path)
def emit_event(self, event: str, **extra) -> None:
"""Append a row to events.jsonl. Public so external drivers
(e.g. the MSF exploit driver) can stamp their own events with
the same monotonic clock the orchestrator is using.
Also pushes the row into the in-memory event log + notifies
any in-flight `_wait_for_event` callers, so the §4.5
event-driven labeller can advance on observed events.
"""
t_mono_ns = (
time.monotonic_ns() - self._t_mono_origin_ns
if self._t_mono_origin_ns
else 0
)
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"event": event,
**extra,
}
with (self.episode_dir / "events.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
with self._event_cv:
self._event_log.append(row)
self._event_cv.notify_all()
def _wait_for_event(
self,
names: tuple[str, ...] | str,
*,
since_t_mono_ns: int,
timeout_s: float,
) -> dict | None:
"""Block until an event in `names` arrives with `t_mono_ns >=
since_t_mono_ns`, or until `timeout_s` elapses. Returns the
matched row or None on timeout.
The threshold catches events that already fired (driver-side
emits during the previous `on_phase` callback) — those are
valid justifiers for the current phase, even though the wait
only started now.
"""
if isinstance(names, str):
names = (names,)
deadline = time.monotonic() + timeout_s
with self._event_cv:
while True:
for ev in self._event_log:
if ev["event"] in names and ev["t_mono_ns"] >= since_t_mono_ns:
return ev
remaining = deadline - time.monotonic()
if remaining <= 0:
return None
self._event_cv.wait(timeout=remaining)
def _emit_label(
self, t_mono_ns: int, phase: str, prev: str | None, reason: str
) -> None:
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"phase": phase,
"prev": prev,
"reason": reason,
}
with (self.episode_dir / "labels.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Pid exists but we can't signal it.
return True