CIS490/tools/vm_load_controller.py
max 2707709299 Fix workload-silent false-positive on Alpine busybox guests (closes #15)
On-device agent (k-gamingcom) ran the diagnostic probe sequence and
proved the workload IS running on Alpine — yes saturating the vCPU,
loadavg=1.05, three yes PIDs visible — but two busybox incompatibilities
made every episode look silent:

1. _probe() used `pgrep -c yes`. The -c flag is procps-ng/util-linux,
   not busybox. busybox pgrep exits 1 with a usage banner; the
   `|| echo 0` fallback then reported yes=0 every time. Switched to
   `pgrep yes | wc -l` which both pgrep variants support.

2. _wrap_loop appended `disown` after the nohup-backgrounded script.
   busybox sh / ash have no disown builtin, so each infected_running
   phase printed `sh: disown: not found` into run()'s captured output.
   The script kept running (nohup gives SIGHUP immunity, which is
   what disown was for), but the spurious error is now gone.

Cross-validation in the classifier:
- prune_episodes.py: workload-silent now requires the probe AND
  host-side /proc CPU envelope (flat-cpu) to AGREE. A probe-only zero
  is treated as the busybox false-positive and dropped. This means
  the 244 already-on-disk episodes from elliott-thinkpad and
  k-gamingcom are correctly classified without re-collecting.

Test coverage:
- test_workload_silent_flag updated to require both signals
- test_workload_silent_suppressed_when_host_cpu_real new regression
  for the busybox false-positive

AGENTS.md gains a "Don't trust the in-guest probe alone" section with
the busybox-vs-procps gotcha + a list of busybox-incompatible patterns
to avoid in any new in-guest diagnostic.
2026-04-30 17:28:48 -05:00

180 lines
7.3 KiB
Python

"""In-guest load controller for tier-2 episodes.
Drives a real Alpine VM through the same phase schedule the orchestrator
follows, but the load this time is generated *inside* the guest by busybox
``yes`` / ``dd`` / a small marker file. The host /proc collector still
samples the qemu-system process from outside — what's "real" here is the
workload itself, not the orchestrator's view of it.
Phase commands (all run via the SerialClient):
clean — kill any running load, idle.
armed — small disk write (handshake-shape).
infecting — disk burst: 512 KiB urandom write to /tmp/payload.
infected_running — background ``yes > /dev/null`` for sustained CPU.
dormant — kill background load (back to idle).
Designed to mimic the envelope of an XMRig-class compromise without
running real malware. Tier-3 will replace this with msf-driven exploit
fire and a real sample.
"""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import Callable
from vm_serial import SerialClient
# Allow running as a script (sibling of tools/).
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from exploits.workloads import Workload, workload_for # noqa: E402
from samples.manifest import Sample # noqa: E402
log = logging.getLogger("cis490.vm_load_controller")
EmitEvent = Callable[..., None]
class VMLoadController:
"""Drives a real Alpine guest through the phase schedule for
Tier 2 (no exploit). Workload is chosen by ``sample.profile`` —
same profile catalog as the Tier-3 driver so a fleet wave
produces matched envelopes whether or not an exploit fires.
Without a sample, falls back to the original cpu-saturate yes-loop
(the original Tier-2 demo behaviour).
Every set_phase call emits an event into the runner's events.jsonl
so we can audit (a) whether the workload command actually got
sent, (b) whether the guest acknowledged it, and (c) whether the
expected process is running afterwards. Without those events,
silent failures (login partial, command swallowed by tty) produce
well-labeled but information-less episodes — see CIS490 history
where every phase median'd 20% CPU on elliott-lab."""
def __init__(
self,
serial: SerialClient,
sample: Sample | None = None,
emit_event: EmitEvent | None = None,
) -> None:
self.s = serial
self.sample = sample
self.workload: Workload | None = workload_for(sample)
# No-op default so callers don't have to thread an emitter.
self.emit: EmitEvent = emit_event or (lambda *a, **kw: None)
def setup(self) -> None:
# Kill any pre-existing load and clear scratch space.
self._kill_load()
self.s.run("rm -f /tmp/payload /tmp/armed.log; echo setup-ok")
self.emit(
"workload_setup",
profile=self.workload.profile if self.workload else "v1-yes",
sample=self.sample.name if self.sample else None,
)
def teardown(self) -> None:
self._kill_load()
# ---- phases ---------------------------------------------------------
def set_phase(self, phase: str) -> None:
log.info("vm phase -> %s (profile=%s)",
phase, self.workload.profile if self.workload else "v1")
try:
if phase == "clean":
self._kill_load()
self._emit_phase("workload_killed", phase)
elif phase == "armed":
self.s.run("echo armed-handshake-$(date +%s) > /tmp/armed.log")
self._emit_phase("workload_armed", phase)
elif phase == "infecting":
self.s.run(
"dd if=/dev/urandom of=/tmp/payload bs=4k count=128 2>/dev/null && "
"chmod +x /tmp/payload"
)
self._emit_phase("workload_infecting", phase)
elif phase == "infected_running":
self._kill_load()
if self.workload is not None:
self.s.run(self.workload.start_cmd)
else:
self.s.run(
"nohup sh -c 'yes > /dev/null' </dev/null >/dev/null 2>&1 & disown"
)
self._emit_phase("workload_started", phase)
elif phase == "dormant":
# Probe BEFORE we kill so we see whether the workload
# was actually running. If the probe says nothing was
# running, the previous infected_running was a no-op
# and the trainer should filter this episode.
probe = self._probe()
self._kill_load()
self._emit_phase("workload_killed", phase, pre_kill_probe=probe)
else:
log.warning("unknown phase: %s", phase)
except Exception as e:
# Don't propagate — the runner already swallows on_phase
# exceptions. But DO record so the episode is filterable.
log.exception("set_phase(%s) failed", phase)
self.emit(
"workload_failed",
phase=phase,
error=str(e)[:200],
profile=self.workload.profile if self.workload else "v1-yes",
)
# ---- internals ------------------------------------------------------
def _kill_load(self) -> None:
if self.workload is not None:
self.s.run(self.workload.stop_cmd)
# Always sweep the v1 leftover commands too, in case we just
# switched profiles mid-fleet-run.
self.s.run("pkill yes 2>/dev/null; pkill stress-ng 2>/dev/null; true")
def _probe(self) -> dict:
"""Ask the guest what's actually running. Returns a small dict
the caller stamps into the event so trainers can detect the
"workload didn't fire" case from meta alone.
Counts processes via ``pgrep <name> | wc -l`` rather than
``pgrep -c <name>``: the latter is a procps-ng/util-linux flag
and is NOT supported by busybox's pgrep (Alpine guests). On
busybox, ``pgrep -c`` exits 1 with a usage banner, the
``|| echo 0`` fallback always fires, and the probe reports
false zeros. See spectral/CIS490#15 — this caused 244 episodes
from elliott-thinkpad and k-gamingcom to be incorrectly
labelled workload-silent even when the workload was running."""
try:
out = self.s.run(
"echo yes=$(pgrep yes 2>/dev/null | wc -l); "
"echo sh=$(pgrep sh 2>/dev/null | wc -l); "
"echo loadavg=$(awk '{print $1}' /proc/loadavg)"
)
stats: dict = {}
for line in out.splitlines():
line = line.strip()
if "=" not in line:
continue
k, _, v = line.partition("=")
stats[k.strip()] = v.strip()
return stats
except Exception as e:
return {"probe_error": str(e)[:120]}
def _emit_phase(self, event: str, phase: str, **extra) -> None:
self.emit(
event,
phase=phase,
profile=self.workload.profile if self.workload else "v1-yes",
sample=self.sample.name if self.sample else None,
**extra,
)