CIS490/orchestrator/episode.py
max a88ac83db0 Close out the deployment-readiness gaps
Wraps the gaps surfaced in the "what is not implemented" audit so the
fleet really is shippable end-to-end. Verified live on the Pi:
  - cis490-shipper --ping → HTTP 200 through Caddy + mTLS via the
    new wg-pki client CA leaf
  - real episode dir → tar+zstd → PUT → HTTP 201 stored
  - re-ship same bytes → 200 (idempotent)
  - re-ship different bytes under same id → 409 (conflict)

Changes:

orchestrator/episode.py
  - EpisodeConfig.revert_at_start / revert_at_end (Tier 0+ snapshot/
    revert per docs/architecture.md). When set + qmp_socket present,
    EpisodeRunner issues loadvm <snapshot_name> and emits
    snapshot_revert / snapshot_revert_failed events on the same
    monotonic clock as everything else.

collectors/qmp.py
  - savevm() / loadvm() helpers using human-monitor-command, plus a
    test against the fake QMP server.

exploits/workloads.py
  - chunked_real_binary_upload() returns a ChunkedUpload plan: 8 KiB
    base64 chunks (~6 KiB binary each) so msfrpc never sees a buffer-
    busting payload. Includes a finalize step that sha256-verifies on
    the guest before exec.
  - real_binary_workload() now wraps the chunked plan for backwards
    compat with single-shot callers.

exploits/driver.py
  - Tier-4 dispatch walks the chunked plan in MSFExploitDriver:
    each chunk is a separate session_shell_write; finalize verifies;
    exec only runs on sha-ok. New events: real_binary_upload_begin,
    real_binary_verify, real_binary_aborted.

etc/cis490-orchestrator.service
  - Reads /etc/cis490/lab-host.env (FLEET_HOST_ID + optional BRIDGE).
  - Grants AmbientCapabilities CAP_NET_RAW (tcpdump for source 4) +
    CAP_SYS_ADMIN + CAP_PERFMON (perf for source 3) so collectors
    work under hardening.

scripts/install-lab-host.sh
  - Writes /etc/cis490/lab-host.env on first install with FLEET_HOST_ID
    defaulting to `hostname -s`.
  - Best-effort: fetches the Alpine baseline qcow2 (sha512-pinned) and
    builds cidata.iso with the in-guest agent embedded; symlinks both
    into /opt/cis490/vm/images/ so launchers find them.

scripts/fetch-alpine-baseline.sh
  - Idempotent fetcher for the Alpine 3.21 cloud-init nocloud qcow2
    matching the sha512 in docs/sources.md.

tools/plot_envelope.py
  - Rebuilt to render whatever telemetry the episode dir contains:
    proc → QMP block ops → perf IPC/miss-rate → bridge pkts/SYNs →
    guest agent load/mem. Missing sources are silently skipped.

tools/index_reader.py
  - cis490-index CLI: filter receiver's index.jsonl by host / sample
    / time range, sort, count-by group. Closest thing to a query
    interface until we stand up Postgres/Timescale.

samples/README.md
  - Rewritten to match the new manifest schema, the kind=real vs mimic
    split, the per-(host, slot, ep) selection mechanic, and the
    chunked-upload safety story.

Tests: 106 pass (was 102). New cases:
  - test_qmp.py — savevm + loadvm (HMP wrapper + error path)
  - test_tier4.py — chunked plan splitting, sha-pinned finalize,
    end-to-end driver walks all chunks + verify + exec via the fake
    msfrpc client

Closes the "what is not implemented" punch list.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:31:55 -05:00

417 lines
16 KiB
Python

"""EpisodeRunner — single-episode driver.
Writes the full per-episode directory shape from ``docs/data-model.md``:
data/episodes/<ulid>/
meta.json
events.jsonl
labels.jsonl
telemetry-proc.jsonl
done.marker
Two modes:
1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window
``clean``. Useful for v0 sanity tests against any pid.
2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of
``(phase, duration_s)`` tuples, emitting one label per transition. An
optional ``on_phase`` callback fires at each transition, used to drive an
external load mimic (or, later, the real exploit/sample driver).
Real VM bring-up will arrive as a third mode that boots a guest, fires an
exploit, and adapts the schedule based on observed events
(``session_open``, ``sample_executed``, ...).
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp
from .ulid import new_ulid
log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1
PhaseSchedule = list[tuple[str, float]]
OnPhase = Callable[[str], None]
@dataclass
class EpisodeConfig:
target_pid: int
duration_s: float
data_root: Path
interval_ms: int = 100
image_name: str = "(stub)"
snapshot_name: str = "(none-stub)"
episode_id: str | None = None
# When set, walk this schedule and ignore duration_s for sleep timing.
# ``duration_s`` still goes in meta.schedule for record-keeping.
phase_schedule: PhaseSchedule | None = None
# Optional: paths to QEMU sockets exposed by the launcher. When
# set, EpisodeRunner spins up additional collector threads.
qmp_socket: Path | None = None
qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads
guest_agent_socket: Path | None = None
# Optional: bridge interface to capture per-episode pcap on. When
# set, EpisodeRunner spawns tcpdump for the duration of the
# schedule and bucketizes the result into netflow.jsonl on stop.
bridge_iface: str | None = None
bridge_ip: str = "10.200.0.1"
pcap_snaplen: int = 256
# Source 3: perf stat sampling. Disabled by default because perf
# needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable
# explicitly per-episode when the host supports it.
enable_perf: bool = False
perf_interval_ms: int = 100
# Snapshot/revert (Tier 0+):
# revert_at_start — before any phase walks, loadvm <snapshot_name>.
# Use this to drop the guest back to a known-good baseline at
# the start of every episode in a long-lived-VM fleet loop.
# revert_at_end — after the schedule walks, loadvm <snapshot_name>
# so the next consumer of this VM starts clean too.
revert_at_start: bool = False
revert_at_end: bool = False
@dataclass
class EpisodeResult:
episode_id: str
episode_dir: Path
rows_proc: int
rows_qmp: int = 0
rows_guest: int = 0
rows_netflow: int = 0
rows_perf: int = 0
pcap_bytes: int = 0
pid_disappeared: bool = False
duration_observed_s: float = 0.0
phases_observed: list[str] = field(default_factory=list)
class EpisodeRunner:
def __init__(
self,
cfg: EpisodeConfig,
on_phase: OnPhase | None = None,
) -> None:
self.cfg = cfg
self.on_phase = on_phase
self.episode_id = cfg.episode_id or new_ulid()
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
# Create the dir up front so external drivers can call
# emit_event() between construction and run() — e.g. an exploit
# driver that writes a driver_setup event before the schedule
# walks. The dir is otherwise empty until run() opens files.
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns: int = 0
self._stop = threading.Event()
# ---- public ---------------------------------------------------------
def run(self) -> EpisodeResult:
self._t_mono_origin_ns = time.monotonic_ns()
started_at_wall = datetime.now(timezone.utc).isoformat()
meta = self._initial_meta(started_at_wall)
self._write_meta(meta)
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
# Snapshot revert at start: pause+restore the guest to a known
# baseline before phase 0. Requires QMP and a savevm having
# already taken place (the launcher is responsible for that).
if self.cfg.revert_at_start and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="start",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at start failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="start",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0}
pcap_handle: pcap.CaptureHandle | None = None
pcap_path = self.episode_dir / "network.pcap"
netflow_path = self.episode_dir / "netflow.jsonl"
if self.cfg.bridge_iface:
try:
pcap_handle = pcap.run_capture(
bridge=self.cfg.bridge_iface,
pcap_path=pcap_path,
snaplen=self.cfg.pcap_snaplen,
)
self.emit_event("pcap_started", iface=self.cfg.bridge_iface)
except (OSError, FileNotFoundError) as e:
log.warning("pcap capture not available on %s: %s",
self.cfg.bridge_iface, e)
self.emit_event("pcap_unavailable",
iface=self.cfg.bridge_iface, error=str(e))
def _proc_collector() -> None:
rows_holder["proc"] = proc_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-proc.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.interval_ms,
stop_event=self._stop,
)
def _qmp_collector() -> None:
assert self.cfg.qmp_socket is not None
rows_holder["qmp"] = qmp.run_loop(
socket_path=self.cfg.qmp_socket,
output_path=self.episode_dir / "telemetry-qmp.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.qmp_interval_ms,
stop_event=self._stop,
)
def _guest_collector() -> None:
assert self.cfg.guest_agent_socket is not None
rows_holder["guest"] = guest_agent.run_loop(
socket_path=self.cfg.guest_agent_socket,
output_path=self.episode_dir / "telemetry-guest.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
stop_event=self._stop,
)
def _perf_collector() -> None:
rows_holder["perf"] = perf_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-perf.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.perf_interval_ms,
stop_event=self._stop,
)
threads: list[threading.Thread] = []
threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu"))
if self.cfg.qmp_socket is not None:
threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp"))
if self.cfg.guest_agent_socket is not None:
threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent"))
if self.cfg.enable_perf:
threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf"))
for t in threads:
t.start()
phases_observed: list[str] = []
try:
if self.cfg.phase_schedule:
phases_observed = self._walk_schedule()
else:
self._emit_label(0, "clean", prev=None, reason="snapshot_loaded")
phases_observed = ["clean"]
self._stop.wait(timeout=self.cfg.duration_s)
finally:
# Optional revert before stopping collectors so the
# transition shows up in their telemetry too — useful for
# building "snapshot revert" as a labeled phase later.
if self.cfg.revert_at_end and self.cfg.qmp_socket is not None:
try:
client = qmp.QMPClient(self.cfg.qmp_socket)
client.connect()
try:
out = client.loadvm(self.cfg.snapshot_name)
self.emit_event(
"snapshot_revert",
when="end",
snapshot=self.cfg.snapshot_name,
output=(out or "").strip()[:256],
)
finally:
client.close()
except Exception as e:
log.warning("loadvm at end failed: %s", e)
self.emit_event(
"snapshot_revert_failed",
when="end",
snapshot=self.cfg.snapshot_name,
error=str(e),
)
self._stop.set()
for t in threads:
t.join(timeout=3.0)
if pcap_handle is not None:
rc = pcap.stop_capture(pcap_handle)
self.emit_event("pcap_stopped", rc=rc,
pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0)
rows_holder["netflow"] = pcap.bucketize(
pcap_path, netflow_path,
bucket_ms=100,
t_mono_origin_ns=self._t_mono_origin_ns,
bridge_ip=self.cfg.bridge_ip,
)
pid_alive = _pid_alive(self.cfg.target_pid)
self.emit_event("episode_end", target_pid_alive=pid_alive)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0
meta["result"] = {
"phases_observed": phases_observed,
"rows_proc": rows_holder["proc"],
"rows_qmp": rows_holder["qmp"],
"rows_guest": rows_holder["guest"],
"rows_perf": rows_holder["perf"],
"rows_netflow": rows_holder["netflow"],
"pcap_bytes": pcap_size,
"pid_alive_at_end": pid_alive,
"duration_observed_s": end_mono_ns / 1_000_000_000,
}
self._write_meta(meta)
(self.episode_dir / "done.marker").touch()
log.info(
"episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s",
self.episode_id,
rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"],
rows_holder["perf"], rows_holder["netflow"], pcap_size,
end_mono_ns / 1e9,
phases_observed,
)
return EpisodeResult(
episode_id=self.episode_id,
episode_dir=self.episode_dir,
rows_proc=rows_holder["proc"],
rows_qmp=rows_holder["qmp"],
rows_guest=rows_holder["guest"],
rows_netflow=rows_holder["netflow"],
rows_perf=rows_holder["perf"],
pcap_bytes=pcap_size,
pid_disappeared=not pid_alive,
duration_observed_s=end_mono_ns / 1_000_000_000,
phases_observed=phases_observed,
)
def stop(self) -> None:
self._stop.set()
# ---- internals ------------------------------------------------------
def _walk_schedule(self) -> list[str]:
observed: list[str] = []
prev: str | None = None
for phase, dur in self.cfg.phase_schedule or []:
if self._stop.is_set():
break
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
self._emit_label(t_mono, phase, prev=prev, reason="scheduled")
self.emit_event("phase_transition", to=phase, prev=prev)
if self.on_phase is not None:
try:
self.on_phase(phase)
except Exception:
log.exception("on_phase callback raised; continuing")
observed.append(phase)
prev = phase
self._stop.wait(timeout=dur)
return observed
def _initial_meta(self, started_at_wall: str) -> dict:
return {
"episode_id": self.episode_id,
"schema_version": SCHEMA_VERSION,
"started_at_wall": started_at_wall,
"ended_at_wall": None,
"host_fingerprint": {
"kernel": os.uname().release,
"qemu_version": None,
},
"vm": {
"image_name": self.cfg.image_name,
"image_sha256": None,
"snapshot_name": self.cfg.snapshot_name,
"vcpus": None,
"ram_mib": None,
"target_pid": self.cfg.target_pid,
},
"exploit": None,
"sample": None,
"schedule": {
"baseline_seconds": self.cfg.duration_s,
"interval_ms": self.cfg.interval_ms,
"phase_schedule": self.cfg.phase_schedule,
},
"result": None,
}
def _write_meta(self, meta: dict) -> None:
path = self.episode_dir / "meta.json"
tmp = path.with_suffix(".json.partial")
with tmp.open("w") as f:
json.dump(meta, f, indent=2, sort_keys=True)
f.write("\n")
os.replace(tmp, path)
def emit_event(self, event: str, **extra) -> None:
"""Append a row to events.jsonl. Public so external drivers
(e.g. the MSF exploit driver) can stamp their own events with
the same monotonic clock the orchestrator is using."""
t_mono_ns = (
time.monotonic_ns() - self._t_mono_origin_ns
if self._t_mono_origin_ns
else 0
)
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"event": event,
**extra,
}
with (self.episode_dir / "events.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _emit_label(
self, t_mono_ns: int, phase: str, prev: str | None, reason: str
) -> None:
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"phase": phase,
"prev": prev,
"reason": reason,
}
with (self.episode_dir / "labels.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Pid exists but we can't signal it.
return True