CIS490/tools/run_real_vm_demo.py
Max Gorog ac7b85ff8d PIPELINE §5 step 1 follow-up: enable perf in production launchers
The §5 step 1 fixes correct the perf collector's stdout/stderr +
event-name parser bugs, but the launchers
(run_real_vm_demo / run_tier3_demo) never set enable_perf=True, so
production episodes still ship with rows_perf=0 — silently disabled
collector, which is exactly the §1 / §4.4 pattern.

Turn it on in both launchers. Failure modes (perf binary missing,
paranoid level too high) are logged as warnings + return 0 rows
visibly, not silently.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:40:37 -05:00

267 lines
9.9 KiB
Python

"""Tier-2: real VM, real workload, labeled phases.
Boots the Alpine cidata VM, logs in over the serial console, drives the
guest through an XMRig-shaped phase schedule (clean → armed → infecting →
infected_running → dormant → re-entry), and lets the orchestrator's host
/proc collector sample the qemu-system pid throughout.
Compared to ``run_envelope_demo.py``: same phase schedule, same labels,
same telemetry shape — but the load is now generated by ``yes`` and
``dd`` running *inside* a real Alpine guest, not by a Python program on
the host. Tier-3 replaces the controller with an MSF-driven exploit
fire.
"""
from __future__ import annotations
import argparse
import logging
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
# Allow running as a script.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent))
from collectors import qmp # noqa: E402
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
from samples.manifest import SampleManifest # noqa: E402
from vm_load_controller import VMLoadController # noqa: E402
from vm_serial import SerialClient # noqa: E402
# Same shape as run_envelope_demo so plots are comparable.
DEFAULT_SCHEDULE = [
("clean", 10.0),
("armed", 2.0),
("infecting", 3.0),
("infected_running", 25.0),
("dormant", 15.0),
("infected_running", 20.0),
("dormant", 5.0),
("clean", 5.0),
]
def _wait_for_socket(path: Path, timeout_s: float) -> None:
import socket as _sk
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if path.exists():
try:
# Verify it's actually live, not a leftover from a dead VM.
t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM)
t.settimeout(0.5)
t.connect(str(path))
t.close()
return
except OSError:
pass
time.sleep(0.2)
raise TimeoutError(f"socket {path} never came alive within {timeout_s}s")
def main() -> int:
parser = argparse.ArgumentParser(prog="run_real_vm_demo")
parser.add_argument("--data-root", default="data")
parser.add_argument("--interval-ms", type=int, default=100)
parser.add_argument(
"--run-dir",
# Per-slot defaults so the fleet runner's parallel calls don't
# collide on the same /tmp dir (which would have rmtree'd each
# other's pidfiles mid-boot — see CIS490 history). Resolution
# order:
# 1) explicit --run-dir CLI flag
# 2) RUN_DIR env (set by the fleet runner)
# 3) /tmp/cis490-vm-<SLOT> (SLOT defaults to 0)
default=(
os.environ.get("RUN_DIR")
or f"/tmp/cis490-vm-{os.environ.get('SLOT', '0')}"
),
help="QEMU run dir (sockets + pidfile go here)",
)
parser.add_argument(
"--keep-vm",
action="store_true",
help="leave the VM running after the episode finishes",
)
parser.add_argument(
"--boot-timeout",
type=float,
default=120.0,
help="how long to wait for serial login prompt",
)
parser.add_argument(
"--sample",
default=os.environ.get("SAMPLE_NAME"),
help="Pick a workload profile from the manifest by name. Fleet runner "
"passes this via SAMPLE_NAME env. If unset, runs the v1 yes-loop.",
)
parser.add_argument(
"--manifest",
default=str(Path(__file__).resolve().parent.parent / "samples" / "manifest.toml"),
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("cis490.run_real_vm_demo")
repo_root = Path(__file__).resolve().parent.parent
launcher = repo_root / "vm" / "launch_demo.sh"
# Resolve sample if requested.
sample = None
if args.sample:
manifest = SampleManifest.load(args.manifest)
sample = next((s for s in manifest.samples if s.name == args.sample), None)
if sample is None:
log.error("sample %r not in manifest %s", args.sample, args.manifest)
return 2
log.info("using sample=%s profile=%s kind=%s",
sample.name, sample.profile, sample.kind)
run_dir = Path(args.run_dir)
# Wipe any stale sockets/pidfile from a previous run.
if run_dir.exists():
import shutil
shutil.rmtree(run_dir)
run_dir.mkdir(parents=True, exist_ok=True)
serial_sock = run_dir / "serial.sock"
pid_file = run_dir / "qemu.pid"
log.info("booting VM via %s (RUN_DIR=%s)", launcher, run_dir)
env = os.environ.copy()
env["RUN_DIR"] = str(run_dir)
qemu = subprocess.Popen(
[str(launcher)],
cwd=str(repo_root),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
try:
_wait_for_socket(serial_sock, timeout_s=15.0)
# Wait for the pid file to be non-empty.
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
if pid_file.exists() and pid_file.read_text().strip():
break
time.sleep(0.2)
qemu_pid = int(pid_file.read_text().strip())
log.info("qemu pid = %d", qemu_pid)
# Cloud-init's runcmd (password setup, sshd hardening) needs some
# time after boot. Wait long enough that the credentials we'll
# send are actually valid.
log.info("waiting 35s for cloud-init runcmd to settle...")
time.sleep(35.0)
log.info("connecting serial + logging in (boot timeout %.0fs)",
args.boot_timeout)
serial = SerialClient(str(serial_sock))
serial.connect()
serial.login(boot_timeout_s=args.boot_timeout)
# Take a savevm AFTER the guest is fully up but BEFORE we
# start any workload. EpisodeConfig.revert_at_{start,end} use
# this snapshot for inter-episode reverts (the snapshot lives
# in the qcow2's per-VM-process overlay since launch_demo.sh
# runs with snapshot=on, so it's discarded with the VM).
# Without this step, loadvm would target a snapshot that
# doesn't exist and silently emit snapshot_revert_failed.
qmp_sock = run_dir / "qmp.sock"
if qmp_sock.exists():
try:
_qmp = qmp.QMPClient(qmp_sock)
_qmp.connect()
try:
out = _qmp.savevm("baseline-v1")
log.info("savevm baseline-v1 OK: %s", out.strip()[:160])
finally:
_qmp.close()
except Exception as e:
log.warning("savevm failed; revert_at_start unusable: %s", e)
# Bind the controller to the runner's event log so workload
# success/failure shows up alongside phase_transition events.
# Sample also goes into EpisodeConfig below so meta.sample
# records what was supposed to run.
runner_for_emit = {"runner": None}
controller = VMLoadController(
serial,
sample=sample,
emit_event=lambda ev, **kw: (
runner_for_emit["runner"].emit_event(ev, **kw)
if runner_for_emit["runner"] else None
),
)
controller.setup()
agent_sock = run_dir / "agent.sock"
cfg = EpisodeConfig(
target_pid=qemu_pid,
duration_s=sum(d for _, d in DEFAULT_SCHEDULE),
interval_ms=args.interval_ms,
data_root=Path(args.data_root),
phase_schedule=DEFAULT_SCHEDULE,
image_name="alpine-3.21-cloudinit",
snapshot_name="baseline-v1",
qmp_socket=qmp_sock if qmp_sock.exists() else None,
guest_agent_socket=agent_sock if agent_sock.exists() else None,
bridge_iface=os.environ.get("BRIDGE") or None,
# Source 3 (oracle): perf-stat sampling of the qemu PID.
# Now that the stdout/stderr + event-name parser bugs are
# fixed, perf produces real rows. Per PIPELINE.md §4.4
# collectors that emit zero rows shouldn't be in the active
# set silently — keeping it default-off was effectively
# silencing the source. On x86_64 lab hosts with
# perf_event_paranoid <= 2 (cis490 user owns qemu PID), the
# collector reads cycles/instructions/page-faults; on hosts
# without perf the collector logs a warning and returns 0
# rows (visible, not silent).
enable_perf=True,
sample=sample,
)
runner = EpisodeRunner(cfg, on_phase=controller.set_phase)
# Connect the controller's event sink to the runner now that
# both exist. (Forward-reference closure pattern keeps the
# constructor argument order natural.)
runner_for_emit["runner"] = runner
result = runner.run()
controller.teardown()
serial.close()
print()
print(f"episode_id = {result.episode_id}")
print(f"path = {result.episode_dir}")
print(f"rows_proc = {result.rows_proc}")
print(f"phases = {result.phases_observed}")
print()
print("To plot:")
print(f" uv run python tools/plot_envelope.py {result.episode_dir}")
return 0
finally:
if not args.keep_vm:
log.info("shutting down VM (pid=%d)", qemu.pid)
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
qemu.wait(timeout=5)
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
if __name__ == "__main__":
sys.exit(main())