CIS490/tests/test_vm_load_controller.py
max d86502d950 workload audit trail: meta.sample + per-phase events + pre-kill probe
The elliott-lab episode showed every phase median'd 20% CPU because
the in-guest workload silently never fired — and there was no signal
in events.jsonl to detect that from outside, so a trainer would
treat the labels as ground truth and learn "all phases look identical".
This commit closes the audit gap so the failure is visible in meta:

orchestrator/episode.py
  EpisodeConfig.sample: Sample | None — the manifest entry that
  drove this episode's workload selection. Stamped into meta.sample
  as {name, family, category, profile, kind, sha256} so trainers
  can join cleanly without re-deriving from events. None means the
  v1 yes-loop fallback path ran (and the trainer should treat the
  episode with appropriate skepticism).

tools/vm_load_controller.py
  VMLoadController gains an emit_event callable. Every phase now
  emits a workload_* event into the runner's events.jsonl:
    workload_setup        login + initial cleanup OK
    workload_killed       clean / dormant. Dormant carries a
                          `pre_kill_probe` dict from inside the
                          guest (`pgrep -c yes`, `pgrep -c sh`,
                          /proc/loadavg) so the trainer can detect
                          the elliott-lab failure mode where the
                          workload never actually ran.
    workload_armed        armed handshake fired
    workload_infecting    dd urandom / payload write fired
    workload_started      infected_running command sent
    workload_failed       any of the above raised inside SerialClient
                          (timeout, EOF, partial login). The runner
                          would have silently swallowed the
                          exception via its on_phase try/except;
                          the audit row makes the failure detectable.
  Exceptions in shell calls surface as workload_failed events but
  do NOT propagate, matching the runner's existing on_phase
  contract.

tools/run_real_vm_demo.py
  Wires the controller's emit_event to the runner's emit_event via
  a small forward-reference closure (controller is built before
  runner; runner.emit_event needs to be the sink). Sample also
  flows into EpisodeConfig.sample so meta.sample matches what the
  controller actually ran.

Tests: 119 (was 106). New cases:
  tests/test_vm_load_controller.py  (11 tests against a FakeSerial)
    - setup emits workload_setup
    - infected_running runs the v1 yes-loop AND emits workload_started
    - dormant probes BEFORE killing and stamps pre_kill_probe
    - dormant probe records "yes=0" (the elliott-lab fingerprint)
    - clean / armed / infecting all emit their respective events
    - serial.run() exception → workload_failed event, no propagation
    - sample-with-profile dispatches to exploits.workloads command
      (NOT the v1 yes-loop)
    - missing emit_event callback is a no-op (back-compat)
  tests/test_episode.py  (2 new)
    - meta.sample carries name/family/category/profile/kind/sha256
      when EpisodeConfig.sample is set
    - meta.sample stays null in the v1 fallback path

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 02:12:34 -05:00

213 lines
8.7 KiB
Python

"""Tests for VMLoadController against a fake SerialClient.
The controller's only job is to translate phases into shell commands
on a serial console + emit audit events. The key invariants we
encode here come from the elliott-lab incident where every phase
median'd 20% CPU because the workload silently never fired:
- every set_phase emits some event (so absence in events.jsonl is
a hard signal)
- infected_running emits workload_started AFTER sending the load
command
- dormant emits workload_killed WITH a pre_kill_probe so trainers
can detect "the workload was never running"
- exceptions in the shell call surface as workload_failed; they
do NOT propagate (the runner's on_phase callback would swallow
them anyway, but we want the audit row regardless)
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
# Mirror the same path hack run_real_vm_demo.py uses so the tools/
# module imports work.
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
sys.path.insert(0, str(ROOT / "tools"))
from samples.manifest import Sample
from vm_load_controller import VMLoadController # noqa: E402
class FakeSerial:
"""Records every shell command. Returns canned probe output."""
def __init__(self, probe_response: str = "yes=1\nsh=1\nloadavg=0.45") -> None:
self.calls: list[str] = []
self.probe_response = probe_response
self.fail_on: list[str] = []
def run(self, cmd: str, timeout_s: float = 10.0) -> str:
self.calls.append(cmd)
for substr in self.fail_on:
if substr in cmd:
raise RuntimeError(f"fake-serial: failing on {substr!r}")
if "pgrep -c yes" in cmd or "pgrep -c sh" in cmd or "loadavg" in cmd:
return self.probe_response
return ""
# ---------------------------------------------------------------------------
# Event emission — the audit trail
# ---------------------------------------------------------------------------
def test_setup_emits_workload_setup_event() -> None:
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.setup()
names = [e for e, _ in events]
assert "workload_setup" in names
setup = next(kw for e, kw in events if e == "workload_setup")
assert setup["profile"] == "v1-yes" # no Sample → fallback path
assert setup["sample"] is None
def test_setup_records_profile_when_sample_present() -> None:
serial = FakeSerial()
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate")
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, sample=s, emit_event=lambda e, **kw: events.append((e, kw)))
c.setup()
setup = next(kw for e, kw in events if e == "workload_setup")
assert setup["profile"] == "cpu-saturate"
assert setup["sample"] == "x"
def test_infected_running_emits_workload_started_after_command() -> None:
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("infected_running")
# The command was sent.
assert any("yes > /dev/null" in cmd for cmd in serial.calls), \
f"expected v1 yes-loop in serial calls; got {serial.calls}"
# And the audit event followed it.
started = [kw for e, kw in events if e == "workload_started"]
assert started, "workload_started event must fire"
assert started[0]["phase"] == "infected_running"
assert started[0]["profile"] == "v1-yes"
def test_dormant_probes_before_killing() -> None:
"""The pre_kill_probe is the load-bearing diagnostic: it tells the
trainer whether the workload was actually running before we
killed it. If pgrep returns 0 yes processes, the previous
infected_running was a no-op and the episode is filterable."""
serial = FakeSerial(probe_response="yes=2\nsh=1\nloadavg=1.32")
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("dormant")
killed = [kw for e, kw in events if e == "workload_killed" and kw["phase"] == "dormant"]
assert killed, "dormant must emit workload_killed"
probe = killed[0].get("pre_kill_probe")
assert probe is not None
assert probe["yes"] == "2"
assert probe["loadavg"] == "1.32"
def test_dormant_probe_records_zero_when_workload_never_ran() -> None:
"""The exact symptom from elliott-lab: dormant probe shows 0
yes processes → trainer can flag this episode as workload-not-firing."""
serial = FakeSerial(probe_response="yes=0\nsh=1\nloadavg=0.18")
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("dormant")
killed = next(kw for e, kw in events if e == "workload_killed" and kw["phase"] == "dormant")
assert killed["pre_kill_probe"]["yes"] == "0"
def test_clean_phase_emits_workload_killed() -> None:
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("clean")
assert any(
e == "workload_killed" and kw["phase"] == "clean" for e, kw in events
), "clean must emit workload_killed"
def test_armed_emits_workload_armed_with_handshake_command() -> None:
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("armed")
assert any("armed-handshake" in cmd for cmd in serial.calls)
assert any(e == "workload_armed" for e, _ in events)
def test_infecting_emits_workload_infecting_with_dd() -> None:
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("infecting")
assert any("dd if=/dev/urandom" in cmd for cmd in serial.calls)
assert any(e == "workload_infecting" for e, _ in events)
# ---------------------------------------------------------------------------
# Exception handling — failures must surface as events, not propagate
# ---------------------------------------------------------------------------
def test_command_failure_emits_workload_failed_and_does_not_raise() -> None:
"""If the serial.run() raises (timeout, EOF, login bad), the
runner would silently swallow the exception. We want a hard
audit row in events.jsonl regardless."""
serial = FakeSerial()
serial.fail_on = ["yes > /dev/null"]
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, emit_event=lambda e, **kw: events.append((e, kw)))
# Must NOT raise.
c.set_phase("infected_running")
failed = [kw for e, kw in events if e == "workload_failed"]
assert failed, "expected workload_failed event"
assert failed[0]["phase"] == "infected_running"
assert "fake-serial" in failed[0]["error"]
# ---------------------------------------------------------------------------
# Profile dispatch — Sample-driven workload picks the right command
# ---------------------------------------------------------------------------
def test_sample_with_profile_uses_workloads_module_command() -> None:
"""When constructed with a Sample, infected_running runs the
profile's start_cmd (from exploits.workloads) — NOT the v1 yes-loop."""
s = Sample(name="x", family="X", category="cryptominer", profile="cpu-saturate")
serial = FakeSerial()
events: list[tuple[str, dict]] = []
c = VMLoadController(serial, sample=s, emit_event=lambda e, **kw: events.append((e, kw)))
c.set_phase("infected_running")
# The sample's workload script + the post-kill yes sweep both ran.
# The new workload is profile-shaped, not the simple yes-loop.
profile_command_seen = any(".cis490-workload-cpu-saturate" in cmd for cmd in serial.calls)
assert profile_command_seen, f"expected workload script in serial calls; got {serial.calls}"
started = next(kw for e, kw in events if e == "workload_started")
assert started["profile"] == "cpu-saturate"
assert started["sample"] == "x"
# ---------------------------------------------------------------------------
# Default emit (no callback supplied) is a no-op
# ---------------------------------------------------------------------------
def test_no_emit_callback_is_safe() -> None:
"""Tests + code paths that don't pass an emitter shouldn't
crash. The default is a no-op lambda."""
serial = FakeSerial()
c = VMLoadController(serial)
# Should not raise.
c.setup()
c.set_phase("infected_running")
c.set_phase("dormant")
c.set_phase("clean")