The elliott-lab episode showed every phase median'd 20% CPU because
the in-guest workload silently never fired — and there was no signal
in events.jsonl to detect that from outside, so a trainer would
treat the labels as ground truth and learn "all phases look identical".
This commit closes the audit gap so the failure is visible in meta:
orchestrator/episode.py
EpisodeConfig.sample: Sample | None — the manifest entry that
drove this episode's workload selection. Stamped into meta.sample
as {name, family, category, profile, kind, sha256} so trainers
can join cleanly without re-deriving from events. None means the
v1 yes-loop fallback path ran (and the trainer should treat the
episode with appropriate skepticism).
tools/vm_load_controller.py
VMLoadController gains an emit_event callable. Every phase now
emits a workload_* event into the runner's events.jsonl:
workload_setup login + initial cleanup OK
workload_killed clean / dormant. Dormant carries a
`pre_kill_probe` dict from inside the
guest (`pgrep -c yes`, `pgrep -c sh`,
/proc/loadavg) so the trainer can detect
the elliott-lab failure mode where the
workload never actually ran.
workload_armed armed handshake fired
workload_infecting dd urandom / payload write fired
workload_started infected_running command sent
workload_failed any of the above raised inside SerialClient
(timeout, EOF, partial login). The runner
would have silently swallowed the
exception via its on_phase try/except;
the audit row makes the failure detectable.
Exceptions in shell calls surface as workload_failed events but
do NOT propagate, matching the runner's existing on_phase
contract.
tools/run_real_vm_demo.py
Wires the controller's emit_event to the runner's emit_event via
a small forward-reference closure (controller is built before
runner; runner.emit_event needs to be the sink). Sample also
flows into EpisodeConfig.sample so meta.sample matches what the
controller actually ran.
Tests: 119 (was 106). New cases:
tests/test_vm_load_controller.py (11 tests against a FakeSerial)
- setup emits workload_setup
- infected_running runs the v1 yes-loop AND emits workload_started
- dormant probes BEFORE killing and stamps pre_kill_probe
- dormant probe records "yes=0" (the elliott-lab fingerprint)
- clean / armed / infecting all emit their respective events
- serial.run() exception → workload_failed event, no propagation
- sample-with-profile dispatches to exploits.workloads command
(NOT the v1 yes-loop)
- missing emit_event callback is a no-op (back-compat)
tests/test_episode.py (2 new)
- meta.sample carries name/family/category/profile/kind/sha256
when EpisodeConfig.sample is set
- meta.sample stays null in the v1 fallback path
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
435 lines
17 KiB
Python
435 lines
17 KiB
Python
"""EpisodeRunner — single-episode driver.
|
|
|
|
Writes the full per-episode directory shape from ``docs/data-model.md``:
|
|
|
|
data/episodes/<ulid>/
|
|
meta.json
|
|
events.jsonl
|
|
labels.jsonl
|
|
telemetry-proc.jsonl
|
|
done.marker
|
|
|
|
Two modes:
|
|
|
|
1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window
|
|
``clean``. Useful for v0 sanity tests against any pid.
|
|
|
|
2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of
|
|
``(phase, duration_s)`` tuples, emitting one label per transition. An
|
|
optional ``on_phase`` callback fires at each transition, used to drive an
|
|
external load mimic (or, later, the real exploit/sample driver).
|
|
|
|
Real VM bring-up will arrive as a third mode that boots a guest, fires an
|
|
exploit, and adapts the schedule based on observed events
|
|
(``session_open``, ``sample_executed``, ...).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp
|
|
from samples.manifest import Sample
|
|
|
|
from .ulid import new_ulid
|
|
|
|
|
|
log = logging.getLogger("cis490.orchestrator")
|
|
|
|
SCHEMA_VERSION = 1
|
|
|
|
PhaseSchedule = list[tuple[str, float]]
|
|
OnPhase = Callable[[str], None]
|
|
|
|
|
|
@dataclass
|
|
class EpisodeConfig:
|
|
target_pid: int
|
|
duration_s: float
|
|
data_root: Path
|
|
interval_ms: int = 100
|
|
image_name: str = "(stub)"
|
|
snapshot_name: str = "(none-stub)"
|
|
episode_id: str | None = None
|
|
# When set, walk this schedule and ignore duration_s for sleep timing.
|
|
# ``duration_s`` still goes in meta.schedule for record-keeping.
|
|
phase_schedule: PhaseSchedule | None = None
|
|
# Optional: paths to QEMU sockets exposed by the launcher. When
|
|
# set, EpisodeRunner spins up additional collector threads.
|
|
qmp_socket: Path | None = None
|
|
qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads
|
|
guest_agent_socket: Path | None = None
|
|
# Optional: bridge interface to capture per-episode pcap on. When
|
|
# set, EpisodeRunner spawns tcpdump for the duration of the
|
|
# schedule and bucketizes the result into netflow.jsonl on stop.
|
|
bridge_iface: str | None = None
|
|
bridge_ip: str = "10.200.0.1"
|
|
pcap_snaplen: int = 256
|
|
# Source 3: perf stat sampling. Disabled by default because perf
|
|
# needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable
|
|
# explicitly per-episode when the host supports it.
|
|
enable_perf: bool = False
|
|
perf_interval_ms: int = 100
|
|
# The Sample that drove this episode's workload selection. Stamped
|
|
# into meta.json so trainers can join episodes by family / kind
|
|
# without re-deriving from events. None = v1 yes-loop fallback.
|
|
sample: Sample | None = None
|
|
# Snapshot/revert (Tier 0+):
|
|
# revert_at_start — before any phase walks, loadvm <snapshot_name>.
|
|
# Use this to drop the guest back to a known-good baseline at
|
|
# the start of every episode in a long-lived-VM fleet loop.
|
|
# revert_at_end — after the schedule walks, loadvm <snapshot_name>
|
|
# so the next consumer of this VM starts clean too.
|
|
revert_at_start: bool = False
|
|
revert_at_end: bool = False
|
|
|
|
|
|
@dataclass
|
|
class EpisodeResult:
|
|
episode_id: str
|
|
episode_dir: Path
|
|
rows_proc: int
|
|
rows_qmp: int = 0
|
|
rows_guest: int = 0
|
|
rows_netflow: int = 0
|
|
rows_perf: int = 0
|
|
pcap_bytes: int = 0
|
|
pid_disappeared: bool = False
|
|
duration_observed_s: float = 0.0
|
|
phases_observed: list[str] = field(default_factory=list)
|
|
|
|
|
|
class EpisodeRunner:
|
|
def __init__(
|
|
self,
|
|
cfg: EpisodeConfig,
|
|
on_phase: OnPhase | None = None,
|
|
) -> None:
|
|
self.cfg = cfg
|
|
self.on_phase = on_phase
|
|
self.episode_id = cfg.episode_id or new_ulid()
|
|
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
|
|
# Create the dir up front so external drivers can call
|
|
# emit_event() between construction and run() — e.g. an exploit
|
|
# driver that writes a driver_setup event before the schedule
|
|
# walks. The dir is otherwise empty until run() opens files.
|
|
self.episode_dir.mkdir(parents=True, exist_ok=True)
|
|
self._t_mono_origin_ns: int = 0
|
|
self._stop = threading.Event()
|
|
|
|
# ---- public ---------------------------------------------------------
|
|
|
|
def run(self) -> EpisodeResult:
|
|
self._t_mono_origin_ns = time.monotonic_ns()
|
|
# snapshot_load is the marker for "episode clock = 0". Emit
|
|
# BEFORE any file I/O — _write_meta() takes >1 ms on slow disks
|
|
# (Refs spectral/CIS490#7).
|
|
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
|
|
|
|
started_at_wall = datetime.now(timezone.utc).isoformat()
|
|
meta = self._initial_meta(started_at_wall)
|
|
self._write_meta(meta)
|
|
|
|
# Snapshot revert at start: pause+restore the guest to a known
|
|
# baseline before phase 0. Requires QMP and a savevm having
|
|
# already taken place (the launcher is responsible for that).
|
|
if self.cfg.revert_at_start and self.cfg.qmp_socket is not None:
|
|
try:
|
|
client = qmp.QMPClient(self.cfg.qmp_socket)
|
|
client.connect()
|
|
try:
|
|
out = client.loadvm(self.cfg.snapshot_name)
|
|
self.emit_event(
|
|
"snapshot_revert",
|
|
when="start",
|
|
snapshot=self.cfg.snapshot_name,
|
|
output=(out or "").strip()[:256],
|
|
)
|
|
finally:
|
|
client.close()
|
|
except Exception as e:
|
|
log.warning("loadvm at start failed: %s", e)
|
|
self.emit_event(
|
|
"snapshot_revert_failed",
|
|
when="start",
|
|
snapshot=self.cfg.snapshot_name,
|
|
error=str(e),
|
|
)
|
|
|
|
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0}
|
|
pcap_handle: pcap.CaptureHandle | None = None
|
|
pcap_path = self.episode_dir / "network.pcap"
|
|
netflow_path = self.episode_dir / "netflow.jsonl"
|
|
if self.cfg.bridge_iface:
|
|
try:
|
|
pcap_handle = pcap.run_capture(
|
|
bridge=self.cfg.bridge_iface,
|
|
pcap_path=pcap_path,
|
|
snaplen=self.cfg.pcap_snaplen,
|
|
)
|
|
self.emit_event("pcap_started", iface=self.cfg.bridge_iface)
|
|
except (OSError, FileNotFoundError) as e:
|
|
log.warning("pcap capture not available on %s: %s",
|
|
self.cfg.bridge_iface, e)
|
|
self.emit_event("pcap_unavailable",
|
|
iface=self.cfg.bridge_iface, error=str(e))
|
|
|
|
def _proc_collector() -> None:
|
|
rows_holder["proc"] = proc_qemu.run_loop(
|
|
pid=self.cfg.target_pid,
|
|
output_path=self.episode_dir / "telemetry-proc.jsonl",
|
|
t_mono_origin_ns=self._t_mono_origin_ns,
|
|
interval_ms=self.cfg.interval_ms,
|
|
stop_event=self._stop,
|
|
)
|
|
|
|
def _qmp_collector() -> None:
|
|
assert self.cfg.qmp_socket is not None
|
|
rows_holder["qmp"] = qmp.run_loop(
|
|
socket_path=self.cfg.qmp_socket,
|
|
output_path=self.episode_dir / "telemetry-qmp.jsonl",
|
|
t_mono_origin_ns=self._t_mono_origin_ns,
|
|
interval_ms=self.cfg.qmp_interval_ms,
|
|
stop_event=self._stop,
|
|
)
|
|
|
|
def _guest_collector() -> None:
|
|
assert self.cfg.guest_agent_socket is not None
|
|
rows_holder["guest"] = guest_agent.run_loop(
|
|
socket_path=self.cfg.guest_agent_socket,
|
|
output_path=self.episode_dir / "telemetry-guest.jsonl",
|
|
t_mono_origin_ns=self._t_mono_origin_ns,
|
|
stop_event=self._stop,
|
|
)
|
|
|
|
def _perf_collector() -> None:
|
|
rows_holder["perf"] = perf_qemu.run_loop(
|
|
pid=self.cfg.target_pid,
|
|
output_path=self.episode_dir / "telemetry-perf.jsonl",
|
|
t_mono_origin_ns=self._t_mono_origin_ns,
|
|
interval_ms=self.cfg.perf_interval_ms,
|
|
stop_event=self._stop,
|
|
)
|
|
|
|
threads: list[threading.Thread] = []
|
|
threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu"))
|
|
if self.cfg.qmp_socket is not None:
|
|
threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp"))
|
|
if self.cfg.guest_agent_socket is not None:
|
|
threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent"))
|
|
if self.cfg.enable_perf:
|
|
threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf"))
|
|
for t in threads:
|
|
t.start()
|
|
|
|
phases_observed: list[str] = []
|
|
try:
|
|
if self.cfg.phase_schedule:
|
|
phases_observed = self._walk_schedule()
|
|
else:
|
|
self._emit_label(0, "clean", prev=None, reason="snapshot_loaded")
|
|
phases_observed = ["clean"]
|
|
self._stop.wait(timeout=self.cfg.duration_s)
|
|
finally:
|
|
# Optional revert before stopping collectors so the
|
|
# transition shows up in their telemetry too — useful for
|
|
# building "snapshot revert" as a labeled phase later.
|
|
if self.cfg.revert_at_end and self.cfg.qmp_socket is not None:
|
|
try:
|
|
client = qmp.QMPClient(self.cfg.qmp_socket)
|
|
client.connect()
|
|
try:
|
|
out = client.loadvm(self.cfg.snapshot_name)
|
|
self.emit_event(
|
|
"snapshot_revert",
|
|
when="end",
|
|
snapshot=self.cfg.snapshot_name,
|
|
output=(out or "").strip()[:256],
|
|
)
|
|
finally:
|
|
client.close()
|
|
except Exception as e:
|
|
log.warning("loadvm at end failed: %s", e)
|
|
self.emit_event(
|
|
"snapshot_revert_failed",
|
|
when="end",
|
|
snapshot=self.cfg.snapshot_name,
|
|
error=str(e),
|
|
)
|
|
|
|
self._stop.set()
|
|
for t in threads:
|
|
t.join(timeout=3.0)
|
|
if pcap_handle is not None:
|
|
rc = pcap.stop_capture(pcap_handle)
|
|
self.emit_event("pcap_stopped", rc=rc,
|
|
pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0)
|
|
rows_holder["netflow"] = pcap.bucketize(
|
|
pcap_path, netflow_path,
|
|
bucket_ms=100,
|
|
t_mono_origin_ns=self._t_mono_origin_ns,
|
|
bridge_ip=self.cfg.bridge_ip,
|
|
)
|
|
|
|
pid_alive = _pid_alive(self.cfg.target_pid)
|
|
self.emit_event("episode_end", target_pid_alive=pid_alive)
|
|
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
|
|
|
|
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
|
|
pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0
|
|
meta["result"] = {
|
|
"phases_observed": phases_observed,
|
|
"rows_proc": rows_holder["proc"],
|
|
"rows_qmp": rows_holder["qmp"],
|
|
"rows_guest": rows_holder["guest"],
|
|
"rows_perf": rows_holder["perf"],
|
|
"rows_netflow": rows_holder["netflow"],
|
|
"pcap_bytes": pcap_size,
|
|
"pid_alive_at_end": pid_alive,
|
|
"duration_observed_s": end_mono_ns / 1_000_000_000,
|
|
}
|
|
self._write_meta(meta)
|
|
(self.episode_dir / "done.marker").touch()
|
|
|
|
log.info(
|
|
"episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s",
|
|
self.episode_id,
|
|
rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"],
|
|
rows_holder["perf"], rows_holder["netflow"], pcap_size,
|
|
end_mono_ns / 1e9,
|
|
phases_observed,
|
|
)
|
|
return EpisodeResult(
|
|
episode_id=self.episode_id,
|
|
episode_dir=self.episode_dir,
|
|
rows_proc=rows_holder["proc"],
|
|
rows_qmp=rows_holder["qmp"],
|
|
rows_guest=rows_holder["guest"],
|
|
rows_netflow=rows_holder["netflow"],
|
|
rows_perf=rows_holder["perf"],
|
|
pcap_bytes=pcap_size,
|
|
pid_disappeared=not pid_alive,
|
|
duration_observed_s=end_mono_ns / 1_000_000_000,
|
|
phases_observed=phases_observed,
|
|
)
|
|
|
|
def stop(self) -> None:
|
|
self._stop.set()
|
|
|
|
# ---- internals ------------------------------------------------------
|
|
|
|
def _walk_schedule(self) -> list[str]:
|
|
observed: list[str] = []
|
|
prev: str | None = None
|
|
for phase, dur in self.cfg.phase_schedule or []:
|
|
if self._stop.is_set():
|
|
break
|
|
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
|
|
self._emit_label(t_mono, phase, prev=prev, reason="scheduled")
|
|
self.emit_event("phase_transition", to=phase, prev=prev)
|
|
if self.on_phase is not None:
|
|
try:
|
|
self.on_phase(phase)
|
|
except Exception:
|
|
log.exception("on_phase callback raised; continuing")
|
|
observed.append(phase)
|
|
prev = phase
|
|
self._stop.wait(timeout=dur)
|
|
return observed
|
|
|
|
def _initial_meta(self, started_at_wall: str) -> dict:
|
|
sample_meta: dict | None = None
|
|
if self.cfg.sample is not None:
|
|
s = self.cfg.sample
|
|
sample_meta = {
|
|
"name": s.name,
|
|
"family": s.family,
|
|
"category": s.category,
|
|
"profile": s.profile,
|
|
"kind": s.kind,
|
|
"sha256": s.sha256,
|
|
}
|
|
return {
|
|
"episode_id": self.episode_id,
|
|
"schema_version": SCHEMA_VERSION,
|
|
"started_at_wall": started_at_wall,
|
|
"ended_at_wall": None,
|
|
"host_fingerprint": {
|
|
"kernel": os.uname().release,
|
|
"qemu_version": None,
|
|
},
|
|
"vm": {
|
|
"image_name": self.cfg.image_name,
|
|
"image_sha256": None,
|
|
"snapshot_name": self.cfg.snapshot_name,
|
|
"vcpus": None,
|
|
"ram_mib": None,
|
|
"target_pid": self.cfg.target_pid,
|
|
},
|
|
"exploit": None,
|
|
"sample": sample_meta,
|
|
"schedule": {
|
|
"baseline_seconds": self.cfg.duration_s,
|
|
"interval_ms": self.cfg.interval_ms,
|
|
"phase_schedule": self.cfg.phase_schedule,
|
|
},
|
|
"result": None,
|
|
}
|
|
|
|
def _write_meta(self, meta: dict) -> None:
|
|
path = self.episode_dir / "meta.json"
|
|
tmp = path.with_suffix(".json.partial")
|
|
with tmp.open("w") as f:
|
|
json.dump(meta, f, indent=2, sort_keys=True)
|
|
f.write("\n")
|
|
os.replace(tmp, path)
|
|
|
|
def emit_event(self, event: str, **extra) -> None:
|
|
"""Append a row to events.jsonl. Public so external drivers
|
|
(e.g. the MSF exploit driver) can stamp their own events with
|
|
the same monotonic clock the orchestrator is using."""
|
|
t_mono_ns = (
|
|
time.monotonic_ns() - self._t_mono_origin_ns
|
|
if self._t_mono_origin_ns
|
|
else 0
|
|
)
|
|
row = {
|
|
"t_mono_ns": t_mono_ns,
|
|
"t_wall_ns": time.time_ns(),
|
|
"event": event,
|
|
**extra,
|
|
}
|
|
with (self.episode_dir / "events.jsonl").open("a") as f:
|
|
f.write(json.dumps(row, sort_keys=True) + "\n")
|
|
|
|
def _emit_label(
|
|
self, t_mono_ns: int, phase: str, prev: str | None, reason: str
|
|
) -> None:
|
|
row = {
|
|
"t_mono_ns": t_mono_ns,
|
|
"t_wall_ns": time.time_ns(),
|
|
"phase": phase,
|
|
"prev": prev,
|
|
"reason": reason,
|
|
}
|
|
with (self.episode_dir / "labels.jsonl").open("a") as f:
|
|
f.write(json.dumps(row, sort_keys=True) + "\n")
|
|
|
|
|
|
def _pid_alive(pid: int) -> bool:
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
# Pid exists but we can't signal it.
|
|
return True
|