diff --git a/orchestrator/episode.py b/orchestrator/episode.py index 9dc4f23..64d8ae4 100644 --- a/orchestrator/episode.py +++ b/orchestrator/episode.py @@ -37,6 +37,7 @@ from pathlib import Path from typing import Callable from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp +from samples.manifest import Sample from .ulid import new_ulid @@ -77,6 +78,10 @@ class EpisodeConfig: # explicitly per-episode when the host supports it. enable_perf: bool = False perf_interval_ms: int = 100 + # The Sample that drove this episode's workload selection. Stamped + # into meta.json so trainers can join episodes by family / kind + # without re-deriving from events. None = v1 yes-loop fallback. + sample: Sample | None = None # Snapshot/revert (Tier 0+): # revert_at_start — before any phase walks, loadvm . # Use this to drop the guest back to a known-good baseline at @@ -341,6 +346,17 @@ class EpisodeRunner: return observed def _initial_meta(self, started_at_wall: str) -> dict: + sample_meta: dict | None = None + if self.cfg.sample is not None: + s = self.cfg.sample + sample_meta = { + "name": s.name, + "family": s.family, + "category": s.category, + "profile": s.profile, + "kind": s.kind, + "sha256": s.sha256, + } return { "episode_id": self.episode_id, "schema_version": SCHEMA_VERSION, @@ -359,7 +375,7 @@ class EpisodeRunner: "target_pid": self.cfg.target_pid, }, "exploit": None, - "sample": None, + "sample": sample_meta, "schedule": { "baseline_seconds": self.cfg.duration_s, "interval_ms": self.cfg.interval_ms, diff --git a/tests/test_episode.py b/tests/test_episode.py index 1e33514..8e91e08 100644 --- a/tests/test_episode.py +++ b/tests/test_episode.py @@ -74,6 +74,57 @@ def test_episode_id_can_be_overridden(tmp_path: Path) -> None: assert result.episode_dir == tmp_path / "episodes" / "01TEST" +def test_meta_sample_records_full_sample_when_passed(tmp_path: Path) -> None: + """EpisodeConfig.sample → meta.sample carries identity + kind so + trainers can join episodes by family/sha256 without re-deriving + from events. With no Sample, meta.sample stays null.""" + import os as _os + + from samples.manifest import Sample + + s = Sample( + name="xmrig-cryptominer", + family="XMRig", + category="cryptominer", + profile="cpu-saturate", + sha256="abc" * 21 + "d", # 64 hex + source="MalwareBazaar", + ) + cfg = EpisodeConfig( + target_pid=_os.getpid(), + duration_s=0.1, + interval_ms=50, + data_root=tmp_path, + sample=s, + ) + result = EpisodeRunner(cfg).run() + + meta = json.loads((result.episode_dir / "meta.json").read_text()) + assert meta["sample"] is not None + assert meta["sample"]["name"] == "xmrig-cryptominer" + assert meta["sample"]["family"] == "XMRig" + assert meta["sample"]["category"] == "cryptominer" + assert meta["sample"]["profile"] == "cpu-saturate" + assert meta["sample"]["kind"] == "real" + assert meta["sample"]["sha256"] == "abc" * 21 + "d" + + +def test_meta_sample_is_null_for_v1_path(tmp_path: Path) -> None: + """No sample passed → the v1 fallback path. meta.sample stays + null so trainers can detect (and filter out) info-less runs.""" + import os as _os + + cfg = EpisodeConfig( + target_pid=_os.getpid(), + duration_s=0.1, + interval_ms=50, + data_root=tmp_path, + ) + result = EpisodeRunner(cfg).run() + meta = json.loads((result.episode_dir / "meta.json").read_text()) + assert meta["sample"] is None + + def test_episode_writes_done_marker_last(tmp_path: Path) -> None: """done.marker should not appear until meta.json has ended_at_wall set.""" cfg = EpisodeConfig( diff --git a/tests/test_vm_load_controller.py b/tests/test_vm_load_controller.py new file mode 100644 index 0000000..cb4b2d6 --- /dev/null +++ b/tests/test_vm_load_controller.py @@ -0,0 +1,213 @@ +"""Tests for VMLoadController against a fake SerialClient. + +The controller's only job is to translate phases into shell commands +on a serial console + emit audit events. The key invariants we +encode here come from the elliott-lab incident where every phase +median'd 20% CPU because the workload silently never fired: + + - every set_phase emits some event (so absence in events.jsonl is + a hard signal) + - infected_running emits workload_started AFTER sending the load + command + - dormant emits workload_killed WITH a pre_kill_probe so trainers + can detect "the workload was never running" + - exceptions in the shell call surface as workload_failed; they + do NOT propagate (the runner's on_phase callback would swallow + them anyway, but we want the audit row regardless) +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +# Mirror the same path hack run_real_vm_demo.py uses so the tools/ +# module imports work. +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "tools")) + +from samples.manifest import Sample +from vm_load_controller import VMLoadController # noqa: E402 + + +class FakeSerial: + """Records every shell command. Returns canned probe output.""" + + def __init__(self, probe_response: str = "yes=1\nsh=1\nloadavg=0.45") -> None: + self.calls: list[str] = [] + self.probe_response = probe_response + self.fail_on: list[str] = [] + + def run(self, cmd: str, timeout_s: float = 10.0) -> str: + self.calls.append(cmd) + for substr in self.fail_on: + if substr in cmd: + raise RuntimeError(f"fake-serial: failing on {substr!r}") + if "pgrep -c yes" in cmd or "pgrep -c sh" in cmd or "loadavg" in cmd: + return self.probe_response + return "" + + +# --------------------------------------------------------------------------- +# Event emission — the audit trail +# --------------------------------------------------------------------------- + + +def test_setup_emits_workload_setup_event() -> None: + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.setup() + names = [e for e, _ in events] + assert "workload_setup" in names + setup = next(kw for e, kw in events if e == "workload_setup") + assert setup["profile"] == "v1-yes" # no Sample → fallback path + assert setup["sample"] is None + + +def test_setup_records_profile_when_sample_present() -> None: + serial = FakeSerial() + s = Sample(name="x", family="X", category="rat", profile="cpu-saturate") + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, sample=s, emit_event=lambda e, **kw: events.append((e, kw))) + c.setup() + setup = next(kw for e, kw in events if e == "workload_setup") + assert setup["profile"] == "cpu-saturate" + assert setup["sample"] == "x" + + +def test_infected_running_emits_workload_started_after_command() -> None: + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("infected_running") + + # The command was sent. + assert any("yes > /dev/null" in cmd for cmd in serial.calls), \ + f"expected v1 yes-loop in serial calls; got {serial.calls}" + # And the audit event followed it. + started = [kw for e, kw in events if e == "workload_started"] + assert started, "workload_started event must fire" + assert started[0]["phase"] == "infected_running" + assert started[0]["profile"] == "v1-yes" + + +def test_dormant_probes_before_killing() -> None: + """The pre_kill_probe is the load-bearing diagnostic: it tells the + trainer whether the workload was actually running before we + killed it. If pgrep returns 0 yes processes, the previous + infected_running was a no-op and the episode is filterable.""" + serial = FakeSerial(probe_response="yes=2\nsh=1\nloadavg=1.32") + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("dormant") + + killed = [kw for e, kw in events if e == "workload_killed" and kw["phase"] == "dormant"] + assert killed, "dormant must emit workload_killed" + probe = killed[0].get("pre_kill_probe") + assert probe is not None + assert probe["yes"] == "2" + assert probe["loadavg"] == "1.32" + + +def test_dormant_probe_records_zero_when_workload_never_ran() -> None: + """The exact symptom from elliott-lab: dormant probe shows 0 + yes processes → trainer can flag this episode as workload-not-firing.""" + serial = FakeSerial(probe_response="yes=0\nsh=1\nloadavg=0.18") + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("dormant") + killed = next(kw for e, kw in events if e == "workload_killed" and kw["phase"] == "dormant") + assert killed["pre_kill_probe"]["yes"] == "0" + + +def test_clean_phase_emits_workload_killed() -> None: + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("clean") + assert any( + e == "workload_killed" and kw["phase"] == "clean" for e, kw in events + ), "clean must emit workload_killed" + + +def test_armed_emits_workload_armed_with_handshake_command() -> None: + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("armed") + assert any("armed-handshake" in cmd for cmd in serial.calls) + assert any(e == "workload_armed" for e, _ in events) + + +def test_infecting_emits_workload_infecting_with_dd() -> None: + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("infecting") + assert any("dd if=/dev/urandom" in cmd for cmd in serial.calls) + assert any(e == "workload_infecting" for e, _ in events) + + +# --------------------------------------------------------------------------- +# Exception handling — failures must surface as events, not propagate +# --------------------------------------------------------------------------- + + +def test_command_failure_emits_workload_failed_and_does_not_raise() -> None: + """If the serial.run() raises (timeout, EOF, login bad), the + runner would silently swallow the exception. We want a hard + audit row in events.jsonl regardless.""" + serial = FakeSerial() + serial.fail_on = ["yes > /dev/null"] + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw))) + # Must NOT raise. + c.set_phase("infected_running") + failed = [kw for e, kw in events if e == "workload_failed"] + assert failed, "expected workload_failed event" + assert failed[0]["phase"] == "infected_running" + assert "fake-serial" in failed[0]["error"] + + +# --------------------------------------------------------------------------- +# Profile dispatch — Sample-driven workload picks the right command +# --------------------------------------------------------------------------- + + +def test_sample_with_profile_uses_workloads_module_command() -> None: + """When constructed with a Sample, infected_running runs the + profile's start_cmd (from exploits.workloads) — NOT the v1 yes-loop.""" + s = Sample(name="x", family="X", category="cryptominer", profile="cpu-saturate") + serial = FakeSerial() + events: list[tuple[str, dict]] = [] + c = VMLoadController(serial, sample=s, emit_event=lambda e, **kw: events.append((e, kw))) + c.set_phase("infected_running") + + # The sample's workload script + the post-kill yes sweep both ran. + # The new workload is profile-shaped, not the simple yes-loop. + profile_command_seen = any(".cis490-workload-cpu-saturate" in cmd for cmd in serial.calls) + assert profile_command_seen, f"expected workload script in serial calls; got {serial.calls}" + started = next(kw for e, kw in events if e == "workload_started") + assert started["profile"] == "cpu-saturate" + assert started["sample"] == "x" + + +# --------------------------------------------------------------------------- +# Default emit (no callback supplied) is a no-op +# --------------------------------------------------------------------------- + + +def test_no_emit_callback_is_safe() -> None: + """Tests + code paths that don't pass an emitter shouldn't + crash. The default is a no-op lambda.""" + serial = FakeSerial() + c = VMLoadController(serial) + # Should not raise. + c.setup() + c.set_phase("infected_running") + c.set_phase("dormant") + c.set_phase("clean") diff --git a/tools/run_real_vm_demo.py b/tools/run_real_vm_demo.py index f47f407..02be39d 100644 --- a/tools/run_real_vm_demo.py +++ b/tools/run_real_vm_demo.py @@ -169,7 +169,19 @@ def main() -> int: serial.connect() serial.login(boot_timeout_s=args.boot_timeout) - controller = VMLoadController(serial, sample=sample) + # Bind the controller to the runner's event log so workload + # success/failure shows up alongside phase_transition events. + # Sample also goes into EpisodeConfig below so meta.sample + # records what was supposed to run. + runner_for_emit = {"runner": None} + controller = VMLoadController( + serial, + sample=sample, + emit_event=lambda ev, **kw: ( + runner_for_emit["runner"].emit_event(ev, **kw) + if runner_for_emit["runner"] else None + ), + ) controller.setup() qmp_sock = run_dir / "qmp.sock" @@ -185,9 +197,15 @@ def main() -> int: qmp_socket=qmp_sock if qmp_sock.exists() else None, guest_agent_socket=agent_sock if agent_sock.exists() else None, bridge_iface=os.environ.get("BRIDGE") or None, + sample=sample, ) - result = EpisodeRunner(cfg, on_phase=controller.set_phase).run() + runner = EpisodeRunner(cfg, on_phase=controller.set_phase) + # Connect the controller's event sink to the runner now that + # both exist. (Forward-reference closure pattern keeps the + # constructor argument order natural.) + runner_for_emit["runner"] = runner + result = runner.run() controller.teardown() serial.close() diff --git a/tools/vm_load_controller.py b/tools/vm_load_controller.py index c8edbb7..a8861d2 100644 --- a/tools/vm_load_controller.py +++ b/tools/vm_load_controller.py @@ -24,6 +24,7 @@ from __future__ import annotations import logging import sys from pathlib import Path +from typing import Callable from vm_serial import SerialClient @@ -37,6 +38,9 @@ from samples.manifest import Sample # noqa: E402 log = logging.getLogger("cis490.vm_load_controller") +EmitEvent = Callable[..., None] + + class VMLoadController: """Drives a real Alpine guest through the phase schedule for Tier 2 (no exploit). Workload is chosen by ``sample.profile`` — @@ -44,17 +48,37 @@ class VMLoadController: produces matched envelopes whether or not an exploit fires. Without a sample, falls back to the original cpu-saturate yes-loop - (the original Tier-2 demo behaviour).""" + (the original Tier-2 demo behaviour). - def __init__(self, serial: SerialClient, sample: Sample | None = None) -> None: + Every set_phase call emits an event into the runner's events.jsonl + so we can audit (a) whether the workload command actually got + sent, (b) whether the guest acknowledged it, and (c) whether the + expected process is running afterwards. Without those events, + silent failures (login partial, command swallowed by tty) produce + well-labeled but information-less episodes — see CIS490 history + where every phase median'd 20% CPU on elliott-lab.""" + + def __init__( + self, + serial: SerialClient, + sample: Sample | None = None, + emit_event: EmitEvent | None = None, + ) -> None: self.s = serial self.sample = sample self.workload: Workload | None = workload_for(sample) + # No-op default so callers don't have to thread an emitter. + self.emit: EmitEvent = emit_event or (lambda *a, **kw: None) def setup(self) -> None: # Kill any pre-existing load and clear scratch space. self._kill_load() self.s.run("rm -f /tmp/payload /tmp/armed.log; echo setup-ok") + self.emit( + "workload_setup", + profile=self.workload.profile if self.workload else "v1-yes", + sample=self.sample.name if self.sample else None, + ) def teardown(self) -> None: self._kill_load() @@ -64,27 +88,48 @@ class VMLoadController: def set_phase(self, phase: str) -> None: log.info("vm phase -> %s (profile=%s)", phase, self.workload.profile if self.workload else "v1") - if phase == "clean": - self._kill_load() - elif phase == "armed": - self.s.run("echo armed-handshake-$(date +%s) > /tmp/armed.log") - elif phase == "infecting": - self.s.run( - "dd if=/dev/urandom of=/tmp/payload bs=4k count=128 2>/dev/null && " - "chmod +x /tmp/payload" - ) - elif phase == "infected_running": - self._kill_load() - if self.workload is not None: - self.s.run(self.workload.start_cmd) - else: + try: + if phase == "clean": + self._kill_load() + self._emit_phase("workload_killed", phase) + elif phase == "armed": + self.s.run("echo armed-handshake-$(date +%s) > /tmp/armed.log") + self._emit_phase("workload_armed", phase) + elif phase == "infecting": self.s.run( - "nohup sh -c 'yes > /dev/null' /dev/null 2>&1 & disown" + "dd if=/dev/urandom of=/tmp/payload bs=4k count=128 2>/dev/null && " + "chmod +x /tmp/payload" ) - elif phase == "dormant": - self._kill_load() - else: - log.warning("unknown phase: %s", phase) + self._emit_phase("workload_infecting", phase) + elif phase == "infected_running": + self._kill_load() + if self.workload is not None: + self.s.run(self.workload.start_cmd) + else: + self.s.run( + "nohup sh -c 'yes > /dev/null' /dev/null 2>&1 & disown" + ) + self._emit_phase("workload_started", phase) + elif phase == "dormant": + # Probe BEFORE we kill so we see whether the workload + # was actually running. If the probe says nothing was + # running, the previous infected_running was a no-op + # and the trainer should filter this episode. + probe = self._probe() + self._kill_load() + self._emit_phase("workload_killed", phase, pre_kill_probe=probe) + else: + log.warning("unknown phase: %s", phase) + except Exception as e: + # Don't propagate — the runner already swallows on_phase + # exceptions. But DO record so the episode is filterable. + log.exception("set_phase(%s) failed", phase) + self.emit( + "workload_failed", + phase=phase, + error=str(e)[:200], + profile=self.workload.profile if self.workload else "v1-yes", + ) # ---- internals ------------------------------------------------------ @@ -94,3 +139,33 @@ class VMLoadController: # Always sweep the v1 leftover commands too, in case we just # switched profiles mid-fleet-run. self.s.run("pkill yes 2>/dev/null; pkill stress-ng 2>/dev/null; true") + + def _probe(self) -> dict: + """Ask the guest what's actually running. Returns a small dict + the caller stamps into the event so trainers can detect the + "workload didn't fire" case from meta alone.""" + try: + out = self.s.run( + "echo yes=$(pgrep -c yes 2>/dev/null || echo 0); " + "echo sh=$(pgrep -c sh 2>/dev/null || echo 0); " + "echo loadavg=$(awk '{print $1}' /proc/loadavg)" + ) + stats: dict = {} + for line in out.splitlines(): + line = line.strip() + if "=" not in line: + continue + k, _, v = line.partition("=") + stats[k.strip()] = v.strip() + return stats + except Exception as e: + return {"probe_error": str(e)[:120]} + + def _emit_phase(self, event: str, phase: str, **extra) -> None: + self.emit( + event, + phase=phase, + profile=self.workload.profile if self.workload else "v1-yes", + sample=self.sample.name if self.sample else None, + **extra, + )