"""Tier-2: real VM, real workload, labeled phases. Boots the Alpine cidata VM, logs in over the serial console, drives the guest through an XMRig-shaped phase schedule (clean → armed → infecting → infected_running → dormant → re-entry), and lets the orchestrator's host /proc collector sample the qemu-system pid throughout. Compared to ``run_envelope_demo.py``: same phase schedule, same labels, same telemetry shape — but the load is now generated by ``yes`` and ``dd`` running *inside* a real Alpine guest, not by a Python program on the host. Tier-3 replaces the controller with an MSF-driven exploit fire. """ from __future__ import annotations import argparse import logging import os import signal import subprocess import sys import time from pathlib import Path # Allow running as a script. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) sys.path.insert(0, str(Path(__file__).resolve().parent)) from collectors import qmp # noqa: E402 from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402 from orchestrator.manifest import ( # noqa: E402 ManifestError, load_canonical, tier2_schedule_from, ) from samples.manifest import SampleManifest # noqa: E402 from vm_load_controller import VMLoadController # noqa: E402 from vm_serial import SerialClient # noqa: E402 # Tier-2 episodes have no exploit firing — their schedule is derived # from the canonical Tier-3 schedule total duration (PIPELINE.md §4.1 # canonical manifest, §4.5 event-driven labeller, §10 honest labels). # `tier2_schedule_from(experiment.schedule)` produces a single `clean` # phase for the same wall-clock as a Tier-3 walk; that keeps episode # lengths comparable across tiers without minting `infected_running` # labels for episodes where nothing infected anything. def _wait_for_socket(path: Path, timeout_s: float) -> None: import socket as _sk deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: if path.exists(): try: # Verify it's actually live, not a leftover from a dead VM. t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM) t.settimeout(0.5) t.connect(str(path)) t.close() return except OSError: pass time.sleep(0.2) raise TimeoutError(f"socket {path} never came alive within {timeout_s}s") def main() -> int: parser = argparse.ArgumentParser(prog="run_real_vm_demo") parser.add_argument("--data-root", default="data") parser.add_argument("--interval-ms", type=int, default=100) parser.add_argument( "--run-dir", # Per-slot defaults so the fleet runner's parallel calls don't # collide on the same /tmp dir (which would have rmtree'd each # other's pidfiles mid-boot — see CIS490 history). Resolution # order: # 1) explicit --run-dir CLI flag # 2) RUN_DIR env (set by the fleet runner) # 3) /tmp/cis490-vm- (SLOT defaults to 0) default=( os.environ.get("RUN_DIR") or f"/tmp/cis490-vm-{os.environ.get('SLOT', '0')}" ), help="QEMU run dir (sockets + pidfile go here)", ) parser.add_argument( "--keep-vm", action="store_true", help="leave the VM running after the episode finishes", ) parser.add_argument( "--boot-timeout", type=float, default=120.0, help="how long to wait for serial login prompt", ) parser.add_argument( "--sample", default=os.environ.get("SAMPLE_NAME"), help="Pick a workload profile from the samples manifest by name. " "Fleet runner passes this via SAMPLE_NAME env. If unset, runs the " "v1 yes-loop.", ) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) log = logging.getLogger("cis490.run_real_vm_demo") repo_root = Path(__file__).resolve().parent.parent launcher = repo_root / "vm" / "launch_demo.sh" # Canonical experiment manifest (PIPELINE.md §4.1). The samples-manifest # path comes from here too; no per-call override. try: experiment = load_canonical(repo_root) except ManifestError as e: log.error("canonical manifest failed to load: %s", e) return 78 samples_path = repo_root / experiment.samples_manifest_path # Resolve sample if requested. sample = None if args.sample: samples = SampleManifest.load(samples_path) sample = next((s for s in samples.samples if s.name == args.sample), None) if sample is None: log.error("sample %r not in samples manifest %s", args.sample, samples_path) return 2 log.info("using sample=%s profile=%s kind=%s", sample.name, sample.profile, sample.kind) run_dir = Path(args.run_dir) # Wipe any stale sockets/pidfile from a previous run. if run_dir.exists(): import shutil shutil.rmtree(run_dir) run_dir.mkdir(parents=True, exist_ok=True) serial_sock = run_dir / "serial.sock" pid_file = run_dir / "qemu.pid" log.info("booting VM via %s (RUN_DIR=%s)", launcher, run_dir) env = os.environ.copy() env["RUN_DIR"] = str(run_dir) qemu = subprocess.Popen( [str(launcher)], cwd=str(repo_root), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) try: _wait_for_socket(serial_sock, timeout_s=15.0) # Wait for the pid file to be non-empty. deadline = time.monotonic() + 15.0 while time.monotonic() < deadline: if pid_file.exists() and pid_file.read_text().strip(): break time.sleep(0.2) qemu_pid = int(pid_file.read_text().strip()) log.info("qemu pid = %d", qemu_pid) # Cloud-init's runcmd (password setup, sshd hardening) needs some # time after boot. Wait long enough that the credentials we'll # send are actually valid. log.info("waiting 35s for cloud-init runcmd to settle...") time.sleep(35.0) log.info("connecting serial + logging in (boot timeout %.0fs)", args.boot_timeout) serial = SerialClient(str(serial_sock)) serial.connect() serial.login(boot_timeout_s=args.boot_timeout) # Take a savevm AFTER the guest is fully up but BEFORE we # start any workload. EpisodeConfig.revert_at_{start,end} use # this snapshot for inter-episode reverts (the snapshot lives # in the qcow2's per-VM-process overlay since launch_demo.sh # runs with snapshot=on, so it's discarded with the VM). # Without this step, loadvm would target a snapshot that # doesn't exist and silently emit snapshot_revert_failed. qmp_sock = run_dir / "qmp.sock" if qmp_sock.exists(): try: _qmp = qmp.QMPClient(qmp_sock) _qmp.connect() try: out = _qmp.savevm("baseline-v1") log.info("savevm baseline-v1 OK: %s", out.strip()[:160]) finally: _qmp.close() except Exception as e: log.warning("savevm failed; revert_at_start unusable: %s", e) # Bind the controller to the runner's event log so workload # success/failure shows up alongside phase_transition events. # Sample also goes into EpisodeConfig below so meta.sample # records what was supposed to run. runner_for_emit = {"runner": None} controller = VMLoadController( serial, sample=sample, emit_event=lambda ev, **kw: ( runner_for_emit["runner"].emit_event(ev, **kw) if runner_for_emit["runner"] else None ), ) controller.setup() agent_sock = run_dir / "agent.sock" schedule = tier2_schedule_from(experiment.schedule) cfg = EpisodeConfig( target_pid=qemu_pid, duration_s=sum(d for _, d in schedule), interval_ms=args.interval_ms, data_root=Path(args.data_root), phase_schedule=schedule, image_name="alpine-3.21-cloudinit", snapshot_name="baseline-v1", qmp_socket=qmp_sock if qmp_sock.exists() else None, guest_agent_socket=agent_sock if agent_sock.exists() else None, bridge_iface=os.environ.get("BRIDGE") or None, experiment_meta=experiment.to_meta(), # Source 3 (oracle): perf-stat sampling of the qemu PID. # Now that the stdout/stderr + event-name parser bugs are # fixed, perf produces real rows. Per PIPELINE.md §4.4 # collectors that emit zero rows shouldn't be in the active # set silently — keeping it default-off was effectively # silencing the source. On x86_64 lab hosts with # perf_event_paranoid <= 2 (cis490 user owns qemu PID), the # collector reads cycles/instructions/page-faults; on hosts # without perf the collector logs a warning and returns 0 # rows (visible, not silent). enable_perf=True, sample=sample, ) runner = EpisodeRunner(cfg, on_phase=controller.set_phase) # Connect the controller's event sink to the runner now that # both exist. (Forward-reference closure pattern keeps the # constructor argument order natural.) runner_for_emit["runner"] = runner result = runner.run() controller.teardown() serial.close() print() print(f"episode_id = {result.episode_id}") print(f"path = {result.episode_dir}") print(f"rows_proc = {result.rows_proc}") print(f"phases = {result.phases_observed}") print() print("To plot:") print(f" uv run python tools/plot_envelope.py {result.episode_dir}") return 0 finally: if not args.keep_vm: log.info("shutting down VM (pid=%d)", qemu.pid) try: os.killpg(os.getpgid(qemu.pid), signal.SIGTERM) except ProcessLookupError: pass try: qemu.wait(timeout=5) except subprocess.TimeoutExpired: os.killpg(os.getpgid(qemu.pid), signal.SIGKILL) if __name__ == "__main__": sys.exit(main())