"""EpisodeRunner — single-episode driver. Writes the full per-episode directory shape from ``docs/data-model.md``: data/episodes// meta.json events.jsonl labels.jsonl telemetry-proc.jsonl done.marker Two modes: 1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window ``clean``. Useful for v0 sanity tests against any pid. 2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of ``(phase, duration_s)`` tuples, emitting one label per transition. An optional ``on_phase`` callback fires at each transition, used to drive an external load mimic (or, later, the real exploit/sample driver). Real VM bring-up will arrive as a third mode that boots a guest, fires an exploit, and adapts the schedule based on observed events (``session_open``, ``sample_executed``, ...). """ from __future__ import annotations import json import logging import os import subprocess import threading import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Callable from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp from samples.manifest import Sample from .ulid import new_ulid # Repo root for the version probe — orchestrator/episode.py lives at # /orchestrator/episode.py. _REPO_ROOT = Path(__file__).resolve().parent.parent # Cached so we don't fork `git` on every episode. _CODE_VERSION_CACHE: dict | None = None def _resolve_code_version() -> dict: """Return a small dict identifying the code that produced this episode. Order of resolution: 1. ``$INSTALL_ROOT/VERSION`` (written by install-lab-host.sh at install time — typical production path, since /opt/cis490 doesn't carry a .git/ dir) 2. ``git rev-parse HEAD`` from the repo root (dev clones) 3. ``{"commit": "unknown"}`` so meta.json always has the field Output shape (always present): {"commit": "<40-hex>" | "unknown", "branch": "" | None, "dirty": bool | None, "source": "VERSION-file" | "git" | "unknown"} Result is cached at module level so per-episode meta emission is free after the first read.""" global _CODE_VERSION_CACHE if _CODE_VERSION_CACHE is not None: return _CODE_VERSION_CACHE # 1. VERSION file (production install). for cand in (_REPO_ROOT / "VERSION", Path("/opt/cis490/VERSION")): if cand.is_file(): try: v = json.loads(cand.read_text()) if isinstance(v, dict) and v.get("commit"): v.setdefault("source", "VERSION-file") _CODE_VERSION_CACHE = v return v except (json.JSONDecodeError, OSError): pass # 2. git rev-parse from repo root (dev clones). try: commit = subprocess.run( ["git", "-C", str(_REPO_ROOT), "rev-parse", "HEAD"], capture_output=True, text=True, timeout=2, check=True, ).stdout.strip() branch = subprocess.run( ["git", "-C", str(_REPO_ROOT), "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True, timeout=2, ).stdout.strip() or None # `git status --porcelain` is empty iff the working tree is clean. porcelain = subprocess.run( ["git", "-C", str(_REPO_ROOT), "status", "--porcelain"], capture_output=True, text=True, timeout=2, ).stdout _CODE_VERSION_CACHE = { "commit": commit, "branch": branch, "dirty": bool(porcelain.strip()), "source": "git", } return _CODE_VERSION_CACHE except (subprocess.SubprocessError, FileNotFoundError, OSError): pass _CODE_VERSION_CACHE = { "commit": "unknown", "branch": None, "dirty": None, "source": "unknown", } return _CODE_VERSION_CACHE log = logging.getLogger("cis490.orchestrator") SCHEMA_VERSION = 1 PhaseSchedule = list[tuple[str, float]] OnPhase = Callable[[str], None] @dataclass class EpisodeConfig: target_pid: int duration_s: float data_root: Path interval_ms: int = 100 image_name: str = "(stub)" snapshot_name: str = "(none-stub)" episode_id: str | None = None # When set, walk this schedule and ignore duration_s for sleep timing. # ``duration_s`` still goes in meta.schedule for record-keeping. phase_schedule: PhaseSchedule | None = None # Optional: paths to QEMU sockets exposed by the launcher. When # set, EpisodeRunner spins up additional collector threads. qmp_socket: Path | None = None qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads guest_agent_socket: Path | None = None # Optional: bridge interface to capture per-episode pcap on. When # set, EpisodeRunner spawns tcpdump for the duration of the # schedule and bucketizes the result into netflow.jsonl on stop. bridge_iface: str | None = None bridge_ip: str = "10.200.0.1" pcap_snaplen: int = 256 # Source 3: perf stat sampling. Disabled by default because perf # needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable # explicitly per-episode when the host supports it. enable_perf: bool = False perf_interval_ms: int = 100 # The Sample that drove this episode's workload selection. Stamped # into meta.json so trainers can join episodes by family / kind # without re-deriving from events. None = v1 yes-loop fallback. sample: Sample | None = None # The exploit module that fired (Tier 3+). Plain dict so the runner # doesn't need to import exploits.modules; populated by callers # that have a ModuleConfig in hand. exploit_meta: dict | None = None # Snapshot/revert (Tier 0+): # revert_at_start — before any phase walks, loadvm . # Use this to drop the guest back to a known-good baseline at # the start of every episode in a long-lived-VM fleet loop. # revert_at_end — after the schedule walks, loadvm # so the next consumer of this VM starts clean too. revert_at_start: bool = False revert_at_end: bool = False @dataclass class EpisodeResult: episode_id: str episode_dir: Path rows_proc: int rows_qmp: int = 0 rows_guest: int = 0 rows_netflow: int = 0 rows_perf: int = 0 pcap_bytes: int = 0 pid_disappeared: bool = False duration_observed_s: float = 0.0 phases_observed: list[str] = field(default_factory=list) class EpisodeRunner: def __init__( self, cfg: EpisodeConfig, on_phase: OnPhase | None = None, ) -> None: self.cfg = cfg self.on_phase = on_phase self.episode_id = cfg.episode_id or new_ulid() self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id # Create the dir up front so external drivers can call # emit_event() between construction and run() — e.g. an exploit # driver that writes a driver_setup event before the schedule # walks. The dir is otherwise empty until run() opens files. self.episode_dir.mkdir(parents=True, exist_ok=True) self._t_mono_origin_ns: int = 0 self._stop = threading.Event() # ---- public --------------------------------------------------------- def run(self) -> EpisodeResult: self._t_mono_origin_ns = time.monotonic_ns() # snapshot_load is the marker for "episode clock = 0". Emit # BEFORE any file I/O — _write_meta() takes >1 ms on slow disks # (Refs spectral/CIS490#7). self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name) started_at_wall = datetime.now(timezone.utc).isoformat() meta = self._initial_meta(started_at_wall) self._write_meta(meta) # Snapshot revert at start: pause+restore the guest to a known # baseline before phase 0. Requires QMP and a savevm having # already taken place (the launcher is responsible for that). if self.cfg.revert_at_start and self.cfg.qmp_socket is not None: try: client = qmp.QMPClient(self.cfg.qmp_socket) client.connect() try: out = client.loadvm(self.cfg.snapshot_name) self.emit_event( "snapshot_revert", when="start", snapshot=self.cfg.snapshot_name, output=(out or "").strip()[:256], ) finally: client.close() except Exception as e: log.warning("loadvm at start failed: %s", e) self.emit_event( "snapshot_revert_failed", when="start", snapshot=self.cfg.snapshot_name, error=str(e), ) rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0} pcap_handle: pcap.CaptureHandle | None = None pcap_path = self.episode_dir / "network.pcap" netflow_path = self.episode_dir / "netflow.jsonl" if self.cfg.bridge_iface: try: pcap_handle = pcap.run_capture( bridge=self.cfg.bridge_iface, pcap_path=pcap_path, snaplen=self.cfg.pcap_snaplen, ) self.emit_event("pcap_started", iface=self.cfg.bridge_iface) except (OSError, FileNotFoundError) as e: log.warning("pcap capture not available on %s: %s", self.cfg.bridge_iface, e) self.emit_event("pcap_unavailable", iface=self.cfg.bridge_iface, error=str(e)) def _proc_collector() -> None: rows_holder["proc"] = proc_qemu.run_loop( pid=self.cfg.target_pid, output_path=self.episode_dir / "telemetry-proc.jsonl", t_mono_origin_ns=self._t_mono_origin_ns, interval_ms=self.cfg.interval_ms, stop_event=self._stop, ) def _qmp_collector() -> None: assert self.cfg.qmp_socket is not None rows_holder["qmp"] = qmp.run_loop( socket_path=self.cfg.qmp_socket, output_path=self.episode_dir / "telemetry-qmp.jsonl", t_mono_origin_ns=self._t_mono_origin_ns, interval_ms=self.cfg.qmp_interval_ms, stop_event=self._stop, ) def _guest_collector() -> None: assert self.cfg.guest_agent_socket is not None rows_holder["guest"] = guest_agent.run_loop( socket_path=self.cfg.guest_agent_socket, output_path=self.episode_dir / "telemetry-guest.jsonl", t_mono_origin_ns=self._t_mono_origin_ns, stop_event=self._stop, # Pipe lifecycle events into the orchestrator's # events.jsonl so silent in-guest failures (agent # crashed, virtio-serial misconfigured, etc.) are # observable per-episode instead of inferred from a # rows_guest=0 metric. Refs PIPELINE.md §1 / §4.4. emit_event=self.emit_event, ) def _perf_collector() -> None: rows_holder["perf"] = perf_qemu.run_loop( pid=self.cfg.target_pid, output_path=self.episode_dir / "telemetry-perf.jsonl", t_mono_origin_ns=self._t_mono_origin_ns, interval_ms=self.cfg.perf_interval_ms, stop_event=self._stop, # Per-episode visibility into perf lifecycle so silent # failures (binary missing, paranoid level too high, # all counters ) surface in events.jsonl # rather than only as rows_perf=0. Refs §1. emit_event=self.emit_event, ) threads: list[threading.Thread] = [] threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu")) if self.cfg.qmp_socket is not None: threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp")) if self.cfg.guest_agent_socket is not None: threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent")) if self.cfg.enable_perf: threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf")) for t in threads: t.start() phases_observed: list[str] = [] try: if self.cfg.phase_schedule: phases_observed = self._walk_schedule() else: self._emit_label(0, "clean", prev=None, reason="snapshot_loaded") phases_observed = ["clean"] self._stop.wait(timeout=self.cfg.duration_s) finally: # Optional revert before stopping collectors so the # transition shows up in their telemetry too — useful for # building "snapshot revert" as a labeled phase later. if self.cfg.revert_at_end and self.cfg.qmp_socket is not None: try: client = qmp.QMPClient(self.cfg.qmp_socket) client.connect() try: out = client.loadvm(self.cfg.snapshot_name) self.emit_event( "snapshot_revert", when="end", snapshot=self.cfg.snapshot_name, output=(out or "").strip()[:256], ) finally: client.close() except Exception as e: log.warning("loadvm at end failed: %s", e) self.emit_event( "snapshot_revert_failed", when="end", snapshot=self.cfg.snapshot_name, error=str(e), ) self._stop.set() for t in threads: t.join(timeout=3.0) if pcap_handle is not None: rc = pcap.stop_capture(pcap_handle) self.emit_event("pcap_stopped", rc=rc, pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0) rows_holder["netflow"] = pcap.bucketize( pcap_path, netflow_path, bucket_ms=100, t_mono_origin_ns=self._t_mono_origin_ns, bridge_ip=self.cfg.bridge_ip, ) pid_alive = _pid_alive(self.cfg.target_pid) self.emit_event("episode_end", target_pid_alive=pid_alive) end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat() pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0 meta["result"] = { "phases_observed": phases_observed, "rows_proc": rows_holder["proc"], "rows_qmp": rows_holder["qmp"], "rows_guest": rows_holder["guest"], "rows_perf": rows_holder["perf"], "rows_netflow": rows_holder["netflow"], "pcap_bytes": pcap_size, "pid_alive_at_end": pid_alive, "duration_observed_s": end_mono_ns / 1_000_000_000, } self._write_meta(meta) (self.episode_dir / "done.marker").touch() log.info( "episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s", self.episode_id, rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"], rows_holder["perf"], rows_holder["netflow"], pcap_size, end_mono_ns / 1e9, phases_observed, ) return EpisodeResult( episode_id=self.episode_id, episode_dir=self.episode_dir, rows_proc=rows_holder["proc"], rows_qmp=rows_holder["qmp"], rows_guest=rows_holder["guest"], rows_netflow=rows_holder["netflow"], rows_perf=rows_holder["perf"], pcap_bytes=pcap_size, pid_disappeared=not pid_alive, duration_observed_s=end_mono_ns / 1_000_000_000, phases_observed=phases_observed, ) def stop(self) -> None: self._stop.set() # ---- internals ------------------------------------------------------ def _walk_schedule(self) -> list[str]: observed: list[str] = [] prev: str | None = None for phase, dur in self.cfg.phase_schedule or []: if self._stop.is_set(): break t_mono = time.monotonic_ns() - self._t_mono_origin_ns self._emit_label(t_mono, phase, prev=prev, reason="scheduled") self.emit_event("phase_transition", to=phase, prev=prev) if self.on_phase is not None: try: self.on_phase(phase) except Exception: log.exception("on_phase callback raised; continuing") observed.append(phase) prev = phase self._stop.wait(timeout=dur) return observed def _initial_meta(self, started_at_wall: str) -> dict: sample_meta: dict | None = None if self.cfg.sample is not None: s = self.cfg.sample sample_meta = { "name": s.name, "family": s.family, "category": s.category, "profile": s.profile, "kind": s.kind, "sha256": s.sha256, } return { "episode_id": self.episode_id, "schema_version": SCHEMA_VERSION, "code_version": _resolve_code_version(), "started_at_wall": started_at_wall, "ended_at_wall": None, "host_fingerprint": { "kernel": os.uname().release, "qemu_version": None, }, "vm": { "image_name": self.cfg.image_name, "image_sha256": None, "snapshot_name": self.cfg.snapshot_name, "vcpus": None, "ram_mib": None, "target_pid": self.cfg.target_pid, }, "exploit": self.cfg.exploit_meta, "sample": sample_meta, "schedule": { "baseline_seconds": self.cfg.duration_s, "interval_ms": self.cfg.interval_ms, "phase_schedule": self.cfg.phase_schedule, }, "result": None, } def _write_meta(self, meta: dict) -> None: path = self.episode_dir / "meta.json" tmp = path.with_suffix(".json.partial") with tmp.open("w") as f: json.dump(meta, f, indent=2, sort_keys=True) f.write("\n") os.replace(tmp, path) def emit_event(self, event: str, **extra) -> None: """Append a row to events.jsonl. Public so external drivers (e.g. the MSF exploit driver) can stamp their own events with the same monotonic clock the orchestrator is using.""" t_mono_ns = ( time.monotonic_ns() - self._t_mono_origin_ns if self._t_mono_origin_ns else 0 ) row = { "t_mono_ns": t_mono_ns, "t_wall_ns": time.time_ns(), "event": event, **extra, } with (self.episode_dir / "events.jsonl").open("a") as f: f.write(json.dumps(row, sort_keys=True) + "\n") def _emit_label( self, t_mono_ns: int, phase: str, prev: str | None, reason: str ) -> None: row = { "t_mono_ns": t_mono_ns, "t_wall_ns": time.time_ns(), "phase": phase, "prev": prev, "reason": reason, } with (self.episode_dir / "labels.jsonl").open("a") as f: f.write(json.dumps(row, sort_keys=True) + "\n") def _pid_alive(pid: int) -> bool: try: os.kill(pid, 0) return True except ProcessLookupError: return False except PermissionError: # Pid exists but we can't signal it. return True