CIS490/orchestrator/episode.py
Maximus Gorog 970698af83 Synthetic envelope demo: phase-driven load mimic + plotter
End-to-end pipeline now produces a labeled envelope from a single command.
Drives the orchestrator through an 8-phase XMRig-shaped schedule and
renders a 3-panel envelope (CPU%, RSS, IO write rate) with phase bands
sourced from labels.jsonl. Real telemetry, simulated load — validates the
collection + labeling shape before a real VM is involved.

Components:
- tools/load_mimic.py        phase-driven load generator. Reads phase
                             commands on stdin; CPU/IO behavior matches
                             the named phase (clean=idle, armed=light burst,
                             infecting=disk burst+CPU, infected_running=
                             CPU saturation+stratum-shaped writes,
                             dormant=quieter than clean).
- tools/run_envelope_demo.py spawns load_mimic, drives EpisodeRunner with
                             a default 85s schedule that includes the
                             classic infected_running → dormant → re-entry
                             pattern.
- tools/plot_envelope.py     reads telemetry + labels from an episode dir,
                             writes envelope.png with colored phase bands.

orchestrator: EpisodeRunner now takes an optional phase_schedule and an
on_phase callback. Walks the schedule emitting one label per transition.
Backwards-compatible — existing single-phase tests still green.

Doc fix (user pushback): README + architecture + threat-model no longer
imply the Pi5 is the deployment target. Pi5's actual role here is the
WireGuard-side collector for episode tarballs. Deployment target is
generic ("constrained Linux device"). The "gateway observer" concept
remains a deployment pattern, decoupled from the Pi5's collector role.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:53:20 -06:00

255 lines
7.9 KiB
Python

"""EpisodeRunner — single-episode driver.
Writes the full per-episode directory shape from ``docs/data-model.md``:
data/episodes/<ulid>/
meta.json
events.jsonl
labels.jsonl
telemetry-proc.jsonl
done.marker
Two modes:
1. **Single-phase** (no ``phase_schedule`` set) — labels the whole window
``clean``. Useful for v0 sanity tests against any pid.
2. **Scheduled** (``phase_schedule`` set on the config) — walks a list of
``(phase, duration_s)`` tuples, emitting one label per transition. An
optional ``on_phase`` callback fires at each transition, used to drive an
external load mimic (or, later, the real exploit/sample driver).
Real VM bring-up will arrive as a third mode that boots a guest, fires an
exploit, and adapts the schedule based on observed events
(``session_open``, ``sample_executed``, ...).
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from collectors import proc_qemu
from .ulid import new_ulid
log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1
PhaseSchedule = list[tuple[str, float]]
OnPhase = Callable[[str], None]
@dataclass
class EpisodeConfig:
target_pid: int
duration_s: float
data_root: Path
interval_ms: int = 100
image_name: str = "(stub)"
snapshot_name: str = "(none-stub)"
episode_id: str | None = None
# When set, walk this schedule and ignore duration_s for sleep timing.
# ``duration_s`` still goes in meta.schedule for record-keeping.
phase_schedule: PhaseSchedule | None = None
@dataclass
class EpisodeResult:
episode_id: str
episode_dir: Path
rows_proc: int
pid_disappeared: bool
duration_observed_s: float
phases_observed: list[str] = field(default_factory=list)
class EpisodeRunner:
def __init__(
self,
cfg: EpisodeConfig,
on_phase: OnPhase | None = None,
) -> None:
self.cfg = cfg
self.on_phase = on_phase
self.episode_id = cfg.episode_id or new_ulid()
self.episode_dir: Path = cfg.data_root / "episodes" / self.episode_id
self._t_mono_origin_ns: int = 0
self._stop = threading.Event()
# ---- public ---------------------------------------------------------
def run(self) -> EpisodeResult:
self.episode_dir.mkdir(parents=True, exist_ok=True)
self._t_mono_origin_ns = time.monotonic_ns()
started_at_wall = datetime.now(timezone.utc).isoformat()
meta = self._initial_meta(started_at_wall)
self._write_meta(meta)
self._emit_event(0, "snapshot_load", snapshot=self.cfg.snapshot_name)
rows_holder: dict[str, int] = {"rows": 0}
def _collector() -> None:
rows_holder["rows"] = proc_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-proc.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.interval_ms,
stop_event=self._stop,
)
t = threading.Thread(target=_collector, daemon=True, name="proc_qemu")
t.start()
phases_observed: list[str] = []
try:
if self.cfg.phase_schedule:
phases_observed = self._walk_schedule()
else:
self._emit_label(0, "clean", prev=None, reason="snapshot_loaded")
phases_observed = ["clean"]
self._stop.wait(timeout=self.cfg.duration_s)
finally:
self._stop.set()
t.join(timeout=2.0)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
pid_alive = _pid_alive(self.cfg.target_pid)
self._emit_event(
end_mono_ns,
"episode_end",
target_pid_alive=pid_alive,
)
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
meta["result"] = {
"phases_observed": phases_observed,
"rows_proc": rows_holder["rows"],
"pid_alive_at_end": pid_alive,
"duration_observed_s": end_mono_ns / 1_000_000_000,
}
self._write_meta(meta)
(self.episode_dir / "done.marker").touch()
log.info(
"episode %s complete: rows=%d duration=%.2fs phases=%s",
self.episode_id,
rows_holder["rows"],
end_mono_ns / 1e9,
phases_observed,
)
return EpisodeResult(
episode_id=self.episode_id,
episode_dir=self.episode_dir,
rows_proc=rows_holder["rows"],
pid_disappeared=not pid_alive,
duration_observed_s=end_mono_ns / 1_000_000_000,
phases_observed=phases_observed,
)
def stop(self) -> None:
self._stop.set()
# ---- internals ------------------------------------------------------
def _walk_schedule(self) -> list[str]:
observed: list[str] = []
prev: str | None = None
for phase, dur in self.cfg.phase_schedule or []:
if self._stop.is_set():
break
t_mono = time.monotonic_ns() - self._t_mono_origin_ns
self._emit_label(t_mono, phase, prev=prev, reason="scheduled")
self._emit_event(
t_mono, "phase_transition", to=phase, prev=prev
)
if self.on_phase is not None:
try:
self.on_phase(phase)
except Exception:
log.exception("on_phase callback raised; continuing")
observed.append(phase)
prev = phase
self._stop.wait(timeout=dur)
return observed
def _initial_meta(self, started_at_wall: str) -> dict:
return {
"episode_id": self.episode_id,
"schema_version": SCHEMA_VERSION,
"started_at_wall": started_at_wall,
"ended_at_wall": None,
"host_fingerprint": {
"kernel": os.uname().release,
"qemu_version": None,
},
"vm": {
"image_name": self.cfg.image_name,
"image_sha256": None,
"snapshot_name": self.cfg.snapshot_name,
"vcpus": None,
"ram_mib": None,
"target_pid": self.cfg.target_pid,
},
"exploit": None,
"sample": None,
"schedule": {
"baseline_seconds": self.cfg.duration_s,
"interval_ms": self.cfg.interval_ms,
"phase_schedule": self.cfg.phase_schedule,
},
"result": None,
}
def _write_meta(self, meta: dict) -> None:
path = self.episode_dir / "meta.json"
tmp = path.with_suffix(".json.partial")
with tmp.open("w") as f:
json.dump(meta, f, indent=2, sort_keys=True)
f.write("\n")
os.replace(tmp, path)
def _emit_event(self, t_mono_ns: int, event: str, **extra) -> None:
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"event": event,
**extra,
}
with (self.episode_dir / "events.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _emit_label(
self, t_mono_ns: int, phase: str, prev: str | None, reason: str
) -> None:
row = {
"t_mono_ns": t_mono_ns,
"t_wall_ns": time.time_ns(),
"phase": phase,
"prev": prev,
"reason": reason,
}
with (self.episode_dir / "labels.jsonl").open("a") as f:
f.write(json.dumps(row, sort_keys=True) + "\n")
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Pid exists but we can't signal it.
return True