The elliott-lab episode showed every phase median'd 20% CPU because
the in-guest workload silently never fired — and there was no signal
in events.jsonl to detect that from outside, so a trainer would
treat the labels as ground truth and learn "all phases look identical".
This commit closes the audit gap so the failure is visible in meta:
orchestrator/episode.py
EpisodeConfig.sample: Sample | None — the manifest entry that
drove this episode's workload selection. Stamped into meta.sample
as {name, family, category, profile, kind, sha256} so trainers
can join cleanly without re-deriving from events. None means the
v1 yes-loop fallback path ran (and the trainer should treat the
episode with appropriate skepticism).
tools/vm_load_controller.py
VMLoadController gains an emit_event callable. Every phase now
emits a workload_* event into the runner's events.jsonl:
workload_setup login + initial cleanup OK
workload_killed clean / dormant. Dormant carries a
`pre_kill_probe` dict from inside the
guest (`pgrep -c yes`, `pgrep -c sh`,
/proc/loadavg) so the trainer can detect
the elliott-lab failure mode where the
workload never actually ran.
workload_armed armed handshake fired
workload_infecting dd urandom / payload write fired
workload_started infected_running command sent
workload_failed any of the above raised inside SerialClient
(timeout, EOF, partial login). The runner
would have silently swallowed the
exception via its on_phase try/except;
the audit row makes the failure detectable.
Exceptions in shell calls surface as workload_failed events but
do NOT propagate, matching the runner's existing on_phase
contract.
tools/run_real_vm_demo.py
Wires the controller's emit_event to the runner's emit_event via
a small forward-reference closure (controller is built before
runner; runner.emit_event needs to be the sink). Sample also
flows into EpisodeConfig.sample so meta.sample matches what the
controller actually ran.
Tests: 119 (was 106). New cases:
tests/test_vm_load_controller.py (11 tests against a FakeSerial)
- setup emits workload_setup
- infected_running runs the v1 yes-loop AND emits workload_started
- dormant probes BEFORE killing and stamps pre_kill_probe
- dormant probe records "yes=0" (the elliott-lab fingerprint)
- clean / armed / infecting all emit their respective events
- serial.run() exception → workload_failed event, no propagation
- sample-with-profile dispatches to exploits.workloads command
(NOT the v1 yes-loop)
- missing emit_event callback is a no-op (back-compat)
tests/test_episode.py (2 new)
- meta.sample carries name/family/category/profile/kind/sha256
when EpisodeConfig.sample is set
- meta.sample stays null in the v1 fallback path
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
171 lines
6.7 KiB
Python
171 lines
6.7 KiB
Python
"""In-guest load controller for tier-2 episodes.
|
|
|
|
Drives a real Alpine VM through the same phase schedule the orchestrator
|
|
follows, but the load this time is generated *inside* the guest by busybox
|
|
``yes`` / ``dd`` / a small marker file. The host /proc collector still
|
|
samples the qemu-system process from outside — what's "real" here is the
|
|
workload itself, not the orchestrator's view of it.
|
|
|
|
Phase commands (all run via the SerialClient):
|
|
|
|
clean — kill any running load, idle.
|
|
armed — small disk write (handshake-shape).
|
|
infecting — disk burst: 512 KiB urandom write to /tmp/payload.
|
|
infected_running — background ``yes > /dev/null`` for sustained CPU.
|
|
dormant — kill background load (back to idle).
|
|
|
|
Designed to mimic the envelope of an XMRig-class compromise without
|
|
running real malware. Tier-3 will replace this with msf-driven exploit
|
|
fire and a real sample.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
from vm_serial import SerialClient
|
|
|
|
# Allow running as a script (sibling of tools/).
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from exploits.workloads import Workload, workload_for # noqa: E402
|
|
from samples.manifest import Sample # noqa: E402
|
|
|
|
|
|
log = logging.getLogger("cis490.vm_load_controller")
|
|
|
|
|
|
EmitEvent = Callable[..., None]
|
|
|
|
|
|
class VMLoadController:
|
|
"""Drives a real Alpine guest through the phase schedule for
|
|
Tier 2 (no exploit). Workload is chosen by ``sample.profile`` —
|
|
same profile catalog as the Tier-3 driver so a fleet wave
|
|
produces matched envelopes whether or not an exploit fires.
|
|
|
|
Without a sample, falls back to the original cpu-saturate yes-loop
|
|
(the original Tier-2 demo behaviour).
|
|
|
|
Every set_phase call emits an event into the runner's events.jsonl
|
|
so we can audit (a) whether the workload command actually got
|
|
sent, (b) whether the guest acknowledged it, and (c) whether the
|
|
expected process is running afterwards. Without those events,
|
|
silent failures (login partial, command swallowed by tty) produce
|
|
well-labeled but information-less episodes — see CIS490 history
|
|
where every phase median'd 20% CPU on elliott-lab."""
|
|
|
|
def __init__(
|
|
self,
|
|
serial: SerialClient,
|
|
sample: Sample | None = None,
|
|
emit_event: EmitEvent | None = None,
|
|
) -> None:
|
|
self.s = serial
|
|
self.sample = sample
|
|
self.workload: Workload | None = workload_for(sample)
|
|
# No-op default so callers don't have to thread an emitter.
|
|
self.emit: EmitEvent = emit_event or (lambda *a, **kw: None)
|
|
|
|
def setup(self) -> None:
|
|
# Kill any pre-existing load and clear scratch space.
|
|
self._kill_load()
|
|
self.s.run("rm -f /tmp/payload /tmp/armed.log; echo setup-ok")
|
|
self.emit(
|
|
"workload_setup",
|
|
profile=self.workload.profile if self.workload else "v1-yes",
|
|
sample=self.sample.name if self.sample else None,
|
|
)
|
|
|
|
def teardown(self) -> None:
|
|
self._kill_load()
|
|
|
|
# ---- phases ---------------------------------------------------------
|
|
|
|
def set_phase(self, phase: str) -> None:
|
|
log.info("vm phase -> %s (profile=%s)",
|
|
phase, self.workload.profile if self.workload else "v1")
|
|
try:
|
|
if phase == "clean":
|
|
self._kill_load()
|
|
self._emit_phase("workload_killed", phase)
|
|
elif phase == "armed":
|
|
self.s.run("echo armed-handshake-$(date +%s) > /tmp/armed.log")
|
|
self._emit_phase("workload_armed", phase)
|
|
elif phase == "infecting":
|
|
self.s.run(
|
|
"dd if=/dev/urandom of=/tmp/payload bs=4k count=128 2>/dev/null && "
|
|
"chmod +x /tmp/payload"
|
|
)
|
|
self._emit_phase("workload_infecting", phase)
|
|
elif phase == "infected_running":
|
|
self._kill_load()
|
|
if self.workload is not None:
|
|
self.s.run(self.workload.start_cmd)
|
|
else:
|
|
self.s.run(
|
|
"nohup sh -c 'yes > /dev/null' </dev/null >/dev/null 2>&1 & disown"
|
|
)
|
|
self._emit_phase("workload_started", phase)
|
|
elif phase == "dormant":
|
|
# Probe BEFORE we kill so we see whether the workload
|
|
# was actually running. If the probe says nothing was
|
|
# running, the previous infected_running was a no-op
|
|
# and the trainer should filter this episode.
|
|
probe = self._probe()
|
|
self._kill_load()
|
|
self._emit_phase("workload_killed", phase, pre_kill_probe=probe)
|
|
else:
|
|
log.warning("unknown phase: %s", phase)
|
|
except Exception as e:
|
|
# Don't propagate — the runner already swallows on_phase
|
|
# exceptions. But DO record so the episode is filterable.
|
|
log.exception("set_phase(%s) failed", phase)
|
|
self.emit(
|
|
"workload_failed",
|
|
phase=phase,
|
|
error=str(e)[:200],
|
|
profile=self.workload.profile if self.workload else "v1-yes",
|
|
)
|
|
|
|
# ---- internals ------------------------------------------------------
|
|
|
|
def _kill_load(self) -> None:
|
|
if self.workload is not None:
|
|
self.s.run(self.workload.stop_cmd)
|
|
# Always sweep the v1 leftover commands too, in case we just
|
|
# switched profiles mid-fleet-run.
|
|
self.s.run("pkill yes 2>/dev/null; pkill stress-ng 2>/dev/null; true")
|
|
|
|
def _probe(self) -> dict:
|
|
"""Ask the guest what's actually running. Returns a small dict
|
|
the caller stamps into the event so trainers can detect the
|
|
"workload didn't fire" case from meta alone."""
|
|
try:
|
|
out = self.s.run(
|
|
"echo yes=$(pgrep -c yes 2>/dev/null || echo 0); "
|
|
"echo sh=$(pgrep -c sh 2>/dev/null || echo 0); "
|
|
"echo loadavg=$(awk '{print $1}' /proc/loadavg)"
|
|
)
|
|
stats: dict = {}
|
|
for line in out.splitlines():
|
|
line = line.strip()
|
|
if "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
stats[k.strip()] = v.strip()
|
|
return stats
|
|
except Exception as e:
|
|
return {"probe_error": str(e)[:120]}
|
|
|
|
def _emit_phase(self, event: str, phase: str, **extra) -> None:
|
|
self.emit(
|
|
event,
|
|
phase=phase,
|
|
profile=self.workload.profile if self.workload else "v1-yes",
|
|
sample=self.sample.name if self.sample else None,
|
|
**extra,
|
|
)
|