CIS490/orchestrator/episode.py
Max Gorog 4ab5477226 PIPELINE §5 step 1: fix four root-cause defects
Diagnoses + fixes for the silent-collector / never-lands-session
failures that the 200-episode quality probe surfaced (§3 evidence).
All four address the producer; no compensating layers added.

perf collector (rows_perf=0 on 100% of episodes):
  - perf stat -j writes to stderr by default with -p; we read stdout.
    Add --log-fd 1 so JSON reaches stdout where the parser sees it.
  - Event names come back annotated with the privilege scope perf
    actually measured ("cycles:u" under perf_event_paranoid=2). Strip
    the suffix so _build_row's plain-name lookups hit. Without this
    every metric was None even when perf reported real numbers.
  - tests/test_collectors_emit.py covers the regression with a real
    busy-loop fixture; emit-test discipline per §4.4.

guest-agent collector (rows_guest=0 on 100% of episodes):
  - Alpine cloud image doesn't ship python3, so the in-guest agent's
    `#!/usr/bin/env python3` shebang silently fails. Add packages:
    [python3] to cidata user-data so cloud-init installs it before
    the OpenRC service starts.
  - Guest agent now exits nonzero (was: silent stdout fallback) when
    /dev/virtio-ports/cis490.guest.agent is missing, so OpenRC
    reports the failure to /var/log/cis490-agent.log instead of the
    bytes vanishing into the void. Refs §1.
  - Host-side collector emits guest_agent_connected /
    guest_agent_first_byte / guest_agent_silent_window into the
    orchestrator's events.jsonl. Future episodes show the in-guest
    failure mode per-episode instead of inferring from rows_guest=0.

k-gamingcom missing qmp/netflow/pcap (also affected elliott on
  Tier-3 episodes — was misclassified as host divergence):
  - tools/run_tier3_demo.py was building EpisodeConfig WITHOUT
    qmp_socket / guest_agent_socket / bridge_iface — even though
    launch_target.sh creates the underlying chardevs and BRIDGE
    supplies the iface. tools/run_real_vm_demo.py wires them
    correctly; Tier-3 had a copy-paste gap.
  - tests/test_collectors_emit.py adds a source-grep regression so
    the wiring stays honest.

samba_usermap_script never lands session (0/67 in §3 probe):
  - Bind handler default WfsDelay (~5s) gives up before bind_perl on
    Metasploitable2 has finished forking + binding LPORT under
    SLIRP+hostfwd. Bump to 30s; matches session_open_timeout_s in
    exploits/driver.py so framework + driver agree on the wait
    budget. Add ConnectTimeout=15 so the handler's bind connect has
    retry budget instead of one-shot.

orchestrator/fleet.py: usable_modules + BRIDGE handling were both
  unconditional, so:
  - With BRIDGE set, requires_bridge modules were still being
    dropped — picker only ever returned samba_usermap_script across
    every slot/episode (the test_fleet_uses_all_modules_when_bridge_set
    failure on HEAD).
  - env.pop("BRIDGE") fired even when BRIDGE was the operator's
    explicit setup, breaking modules that need bridge mode (vsftpd
    backdoor on hardcoded port 6200, distccd, etc.).
  Both made conditional on bridge_set so the picker walks the full
  catalog under bridge mode and SLIRP-only modules still get a
  clean SLIRP env when BRIDGE is unset.

receiver/app.py: half-pregnant v2 schema state in HEAD — calling
  store.ingest_stream(episode_type=..., benign_profile=...) with
  kwargs the matching store.py change was in the WIP stash. Removed
  v2 awareness from app.py so v1 episodes (what the producer ships
  today) get accepted again. SCHEMA_VERSION default reset to 1 to
  match.

229 passed, 0 failed. (HEAD had 15 failures, all linked to the
half-pregnant v2 state above.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:05:25 -05:00

520 lines
20 KiB
Python

"""EpisodeRunner — single-episode driver.
Writes the full per-episode directory shape from ``docs/data-model.md``:
data/episodes/<ulid>/
meta.json
events.jsonl
labels.jsonl
telemetry-proc.jsonl
done.marker
Two modes:
1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window
``clean``. Useful for v0 sanity tests against any pid.
2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of
``(phase, duration_s)`` tuples, emitting one label per transition. An
optional ``on_phase`` callback fires at each transition, used to drive an
external load mimic (or, later, the real exploit/sample driver).
Real VM bring-up will arrive as a third mode that boots a guest, fires an
exploit, and adapts the schedule based on observed events
(``session_open``, ``sample_executed``, ...).
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp
from samples.manifest import Sample
from .ulid import new_ulid
# Repo root for the version probe — orchestrator/episode.py lives at
# <repo>/orchestrator/episode.py.
_REPO_ROOT = Path(__file__).resolve().parent.parent
# Cached so we don't fork `git` on every episode.
_CODE_VERSION_CACHE: dict | None = None
def _resolve_code_version() -> dict:
"""Return a small dict identifying the code that produced this episode.
Order of resolution:
1. ``$INSTALL_ROOT/VERSION`` (written by install-lab-host.sh at
install time — typical production path, since /opt/cis490
doesn't carry a .git/ dir)
2. ``git rev-parse HEAD`` from the repo root (dev clones)
3. ``{"commit": "unknown"}`` so meta.json always has the field
Output shape (always present):
{"commit": "<40-hex>" | "unknown",
"branch": "<name>" | None,
"dirty": bool | None,
"source": "VERSION-file" | "git" | "unknown"}
Result is cached at module level so per-episode meta emission is
free after the first read."""
global _CODE_VERSION_CACHE
if _CODE_VERSION_CACHE is not None:
return _CODE_VERSION_CACHE
# 1. VERSION file (production install).
for cand in (_REPO_ROOT / "VERSION", Path("/opt/cis490/VERSION")):
if cand.is_file():
try:
v = json.loads(cand.read_text())
if isinstance(v, dict) and v.get("commit"):
v.setdefault("source", "VERSION-file")
_CODE_VERSION_CACHE = v
return v
except (json.JSONDecodeError, OSError):
pass
# 2. git rev-parse from repo root (dev clones).
try:
commit = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "HEAD"],
capture_output=True, text=True, timeout=2, check=True,
).stdout.strip()
branch = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True, text=True, timeout=2,
).stdout.strip() or None
# `git status --porcelain` is empty iff the working tree is clean.
porcelain = subprocess.run(
["git", "-C", str(_REPO_ROOT), "status", "--porcelain"],
capture_output=True, text=True, timeout=2,
).stdout
_CODE_VERSION_CACHE = {
"commit": commit,
"branch": branch,
"dirty": bool(porcelain.strip()),
"source": "git",
}
return _CODE_VERSION_CACHE
except (subprocess.SubprocessError, FileNotFoundError, OSError):
pass
_CODE_VERSION_CACHE = {
"commit": "unknown", "branch": None, "dirty": None, "source": "unknown",
}
return _CODE_VERSION_CACHE
log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1
PhaseSchedule = list[tuple[str, float]]
OnPhase = Callable[[str], None]
@dataclass
class EpisodeConfig:
target_pid: int
duration_s: float
data_root: Path
interval_ms: int = 100
image_name: str = "(stub)"
snapshot_name: str = "(none-stub)"
episode_id: str | None = None
# When set, walk this schedule and ignore duration_s for sleep timing.
# ``duration_s`` still goes in meta.schedule for record-keeping.
phase_schedule: PhaseSchedule | None = None
# Optional: paths to QEMU sockets exposed by the launcher. When
# set, EpisodeRunner spins up additional collector threads.
qmp_socket: Path | None = None
qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads
guest_agent_socket: Path | None = None
# Optional: bridge interface to capture per-episode pcap on. When
# set, EpisodeRunner spawns tcpdump for the duration of the
# schedule and bucketizes the result into netflow.jsonl on stop.
bridge_iface: str | None = None
bridge_ip: str = "10.200.0.1"
pcap_snaplen: int = 256
# Source 3: perf stat sampling. Disabled by default because perf
# needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable
# explicitly per-episode when the host supports it.
enable_perf: bool = False
perf_interval_ms: int = 100
# The Sample that drove this episode's workload selection. Stamped
# into meta.json so trainers can join episodes by family / kind
# without re-deriving from events. None = v1 yes-loop fallback.
sample: Sample | None = None
# The exploit module that fired (Tier 3+). Plain dict so the runner
# doesn't need to import exploits.modules; populated by callers
# that have a ModuleConfig in hand.
exploit_meta: dict | None = None
# Snapshot/revert (Tier 0+):
# revert_at_start — before any phase walks, loadvm <snapshot_name>.
# Use this to drop the guest back to a known-good baseline at
# the start of every episode in a long-lived-VM fleet loop.
# revert_at_end — after the schedule walks, loadvm <snapshot_name>
# so the next consumer of this VM starts clean too.
revert_at_start: bool = False
revert_at_end: bool = False
@dataclass
class EpisodeResult:
episode_id: str
episode_dir: Path
rows_proc: int
rows_qmp: int = 0
rows_guest: int = 0
rows_netflow: int = 0
rows_perf: int = 0
pcap_bytes: int = 0
pid_disappeared: bool = False
duration_observed_s: float = 0.0
phases_observed: list[str] = field(default_factory=list)
class EpisodeRunner:
def __init__(
self,
cfg: EpisodeConfig,
on_phase: OnPhase | None = None,
) -> None:
self.cfg = cfg
self.on_phase = on_phase
self.episode_id = cfg.episode_id or new_ulid()
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
# Create the dir up front so external drivers can call
# emit_event() between construction and run() — e.g. an exploit
# driver that writes a driver_setup event before the schedule
# walks. The dir is otherwise empty until run() opens files.
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns: int = 0
self._stop = threading.Event()
# ---- public ---------------------------------------------------------
def run(self) -> EpisodeResult:
self._t_mono_origin_ns = time.monotonic_ns()
# snapshot_load is the marker for "episode clock = 0". Emit
# BEFORE any file I/O — _write_meta() takes >1 ms on slow disks
# (Refs spectral/CIS490#7).
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
started_at_wall = datetime.now(timezone.utc).isoformat()
meta = self._initial_meta(started_at_wall)
self._write_meta(meta)
# Snapshot revert at start: pause+restore the guest to a known
# baseline before phase 0. Requires QMP and a savevm having
# already taken place (the launcher is responsible for that).
if self.cfg.revert_at_start and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="start",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at start failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="start",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0}
pcap_handle: pcap.CaptureHandle | None = None
pcap_path = self.episode_dir / "network.pcap"
netflow_path = self.episode_dir / "netflow.jsonl"
if self.cfg.bridge_iface:
try:
pcap_handle = pcap.run_capture(
bridge=self.cfg.bridge_iface,
pcap_path=pcap_path,
snaplen=self.cfg.pcap_snaplen,
)
self.emit_event("pcap_started", iface=self.cfg.bridge_iface)
except (OSError, FileNotFoundError) as e:
log.warning("pcap capture not available on %s: %s",
self.cfg.bridge_iface, e)
self.emit_event("pcap_unavailable",
iface=self.cfg.bridge_iface, error=str(e))
def _proc_collector() -> None:
rows_holder["proc"] = proc_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-proc.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.interval_ms,
stop_event=self._stop,
)
def _qmp_collector() -> None:
assert self.cfg.qmp_socket is not None
rows_holder["qmp"] = qmp.run_loop(
socket_path=self.cfg.qmp_socket,
output_path=self.episode_dir / "telemetry-qmp.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.qmp_interval_ms,
stop_event=self._stop,
)
def _guest_collector() -> None:
assert self.cfg.guest_agent_socket is not None
rows_holder["guest"] = guest_agent.run_loop(
socket_path=self.cfg.guest_agent_socket,
output_path=self.episode_dir / "telemetry-guest.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
stop_event=self._stop,
# Pipe lifecycle events into the orchestrator's
# events.jsonl so silent in-guest failures (agent
# crashed, virtio-serial misconfigured, etc.) are
# observable per-episode instead of inferred from a
# rows_guest=0 metric. Refs PIPELINE.md §1 / §4.4.
emit_event=self.emit_event,
)
def _perf_collector() -> None:
rows_holder["perf"] = perf_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-perf.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.perf_interval_ms,
stop_event=self._stop,
)
threads: list[threading.Thread] = []
threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu"))
if self.cfg.qmp_socket is not None:
threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp"))
if self.cfg.guest_agent_socket is not None:
threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent"))
if self.cfg.enable_perf:
threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf"))
for t in threads:
t.start()
phases_observed: list[str] = []
try:
if self.cfg.phase_schedule:
phases_observed = self._walk_schedule()
else:
self._emit_label(0, "clean", prev=None, reason="snapshot_loaded")
phases_observed = ["clean"]
self._stop.wait(timeout=self.cfg.duration_s)
finally:
# Optional revert before stopping collectors so the
# transition shows up in their telemetry too — useful for
# building "snapshot revert" as a labeled phase later.
if self.cfg.revert_at_end and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="end",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at end failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="end",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
self._stop.set()
for t in threads:
t.join(timeout=3.0)
if pcap_handle is not None:
rc = pcap.stop_capture(pcap_handle)
self.emit_event("pcap_stopped", rc=rc,
pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0)
rows_holder["netflow"] = pcap.bucketize(
pcap_path, netflow_path,
bucket_ms=100,
t_mono_origin_ns=self._t_mono_origin_ns,
bridge_ip=self.cfg.bridge_ip,
)
pid_alive = _pid_alive(self.cfg.target_pid)
self.emit_event("episode_end", target_pid_alive=pid_alive)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0
meta["result"] = {
"phases_observed": phases_observed,
"rows_proc": rows_holder["proc"],
"rows_qmp": rows_holder["qmp"],
"rows_guest": rows_holder["guest"],
"rows_perf": rows_holder["perf"],
"rows_netflow": rows_holder["netflow"],
"pcap_bytes": pcap_size,
"pid_alive_at_end": pid_alive,
"duration_observed_s": end_mono_ns / 1_000_000_000,
}
self._write_meta(meta)
(self.episode_dir / "done.marker").touch()
log.info(
"episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s",
self.episode_id,
rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"],
rows_holder["perf"], rows_holder["netflow"], pcap_size,
end_mono_ns / 1e9,
phases_observed,
)
return EpisodeResult(
episode_id=self.episode_id,
episode_dir=self.episode_dir,
rows_proc=rows_holder["proc"],
rows_qmp=rows_holder["qmp"],
rows_guest=rows_holder["guest"],
rows_netflow=rows_holder["netflow"],
rows_perf=rows_holder["perf"],
pcap_bytes=pcap_size,
pid_disappeared=not pid_alive,
duration_observed_s=end_mono_ns / 1_000_000_000,
phases_observed=phases_observed,
)
def stop(self) -> None:
self._stop.set()
# ---- internals ------------------------------------------------------
def _walk_schedule(self) -> list[str]:
observed: list[str] = []
prev: str | None = None
for phase, dur in self.cfg.phase_schedule or []:
if self._stop.is_set():
break
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
self._emit_label(t_mono, phase, prev=prev, reason="scheduled")
self.emit_event("phase_transition", to=phase, prev=prev)
if self.on_phase is not None:
try:
self.on_phase(phase)
except Exception:
log.exception("on_phase callback raised; continuing")
observed.append(phase)
prev = phase
self._stop.wait(timeout=dur)
return observed
def _initial_meta(self, started_at_wall: str) -> dict:
sample_meta: dict | None = None
if self.cfg.sample is not None:
s = self.cfg.sample
sample_meta = {
"name": s.name,
"family": s.family,
"category": s.category,
"profile": s.profile,
"kind": s.kind,
"sha256": s.sha256,
}
return {
"episode_id": self.episode_id,
"schema_version": SCHEMA_VERSION,
"code_version": _resolve_code_version(),
"started_at_wall": started_at_wall,
"ended_at_wall": None,
"host_fingerprint": {
"kernel": os.uname().release,
"qemu_version": None,
},
"vm": {
"image_name": self.cfg.image_name,
"image_sha256": None,
"snapshot_name": self.cfg.snapshot_name,
"vcpus": None,
"ram_mib": None,
"target_pid": self.cfg.target_pid,
},
"exploit": self.cfg.exploit_meta,
"sample": sample_meta,
"schedule": {
"baseline_seconds": self.cfg.duration_s,
"interval_ms": self.cfg.interval_ms,
"phase_schedule": self.cfg.phase_schedule,
},
"result": None,
}
def _write_meta(self, meta: dict) -> None:
path = self.episode_dir / "meta.json"
tmp = path.with_suffix(".json.partial")
with tmp.open("w") as f:
json.dump(meta, f, indent=2, sort_keys=True)
f.write("\n")
os.replace(tmp, path)
def emit_event(self, event: str, **extra) -> None:
"""Append a row to events.jsonl. Public so external drivers
(e.g. the MSF exploit driver) can stamp their own events with
the same monotonic clock the orchestrator is using."""
t_mono_ns = (
time.monotonic_ns() - self._t_mono_origin_ns
if self._t_mono_origin_ns
else 0
)
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"event": event,
**extra,
}
with (self.episode_dir / "events.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _emit_label(
self, t_mono_ns: int, phase: str, prev: str | None, reason: str
) -> None:
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"phase": phase,
"prev": prev,
"reason": reason,
}
with (self.episode_dir / "labels.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Pid exists but we can't signal it.
return True