CIS490/tools/run_real_vm_demo.py
Max Gorog 3d4f282e9c Tier-2 episodes use clean-only schedule; .gitignore VERSION
Two correctness fixes that the §4.5 event-driven labeller surfaced:

1. tools/run_real_vm_demo.py was hardcoding a Tier-3-shaped schedule
   (clean → armed → infecting → infected_running → ...) for episodes
   with no exploit firing. Pre-§4.5 those episodes wrote dishonest
   `infected_running` labels from the schedule clock — exactly the §3
   evidence pattern. Post-§4.5 they write `failed` at the infecting
   transition (the justifying exploit_fire never arrives), which is
   honest about what happened but useless for training.

   The honest fix: Tier-2 episodes have a clean-only schedule. All
   telemetry tagged `clean` because nothing infected anything. The
   total duration matches the canonical Tier-3 schedule so episode
   lengths are comparable across tiers — no length-bias in the
   dataset (§10).

   Helper `tier2_schedule_from(schedule)` in orchestrator/manifest.py
   derives `[("clean", total_seconds)]` from the canonical schedule.
   `tier3_schedule_from(schedule)` renders the legacy
   `[(name, seconds)]` shape EpisodeConfig still expects.

   Tier-2 demo (run_real_vm_demo.py) now calls tier2_schedule_from.
   Tier-3 demo (run_tier3_demo.py) now calls tier3_schedule_from.
   Drops the hardcoded DEFAULT_SCHEDULE constants from both — the
   canonical manifest is the single source of truth (§4.1).

2. .gitignore now excludes /VERSION. The install-lab-host.sh stamp
   writes /opt/cis490/VERSION so episodes can record code provenance
   without /opt/cis490 carrying a .git directory. But /opt/cis490 IS
   typically a git checkout on lab hosts (auto-update.sh pulls into
   it), so writing VERSION leaves the working tree dirty. Every
   episode's meta.code_version.dirty=true. PIPELINE.md §4.6 acceptance
   gate's rule 4 would then reject every episode without
   CIS490_ALLOW_DIRTY=1 set — which would break the data flow.

   Now VERSION is .gitignored: install-lab-host.sh stamps it, git
   status doesn't see it, dirty=false, gate rule 4 passes naturally.

These two changes together keep the data flowing AND honest. Tier-2
episodes pass with `phases=[clean]` + every collector emitting real
rows. Tier-3 episodes (none today, empty catalog) walk the full
event-driven schedule when a verified module gets re-admitted.

286 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:55:37 -05:00

275 lines
10 KiB
Python

"""Tier-2: real VM, real workload, labeled phases.
Boots the Alpine cidata VM, logs in over the serial console, drives the
guest through an XMRig-shaped phase schedule (clean → armed → infecting →
infected_running → dormant → re-entry), and lets the orchestrator's host
/proc collector sample the qemu-system pid throughout.
Compared to ``run_envelope_demo.py``: same phase schedule, same labels,
same telemetry shape — but the load is now generated by ``yes`` and
``dd`` running *inside* a real Alpine guest, not by a Python program on
the host. Tier-3 replaces the controller with an MSF-driven exploit
fire.
"""
from __future__ import annotations
import argparse
import logging
import os
import signal
import subprocess
import sys
import time
from pathlib import Path
# Allow running as a script.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent))
from collectors import qmp # noqa: E402
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
from orchestrator.manifest import ( # noqa: E402
ManifestError, load_canonical, tier2_schedule_from,
)
from samples.manifest import SampleManifest # noqa: E402
from vm_load_controller import VMLoadController # noqa: E402
from vm_serial import SerialClient # noqa: E402
# Tier-2 episodes have no exploit firing — their schedule is derived
# from the canonical Tier-3 schedule total duration (PIPELINE.md §4.1
# canonical manifest, §4.5 event-driven labeller, §10 honest labels).
# `tier2_schedule_from(experiment.schedule)` produces a single `clean`
# phase for the same wall-clock as a Tier-3 walk; that keeps episode
# lengths comparable across tiers without minting `infected_running`
# labels for episodes where nothing infected anything.
def _wait_for_socket(path: Path, timeout_s: float) -> None:
import socket as _sk
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if path.exists():
try:
# Verify it's actually live, not a leftover from a dead VM.
t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM)
t.settimeout(0.5)
t.connect(str(path))
t.close()
return
except OSError:
pass
time.sleep(0.2)
raise TimeoutError(f"socket {path} never came alive within {timeout_s}s")
def main() -> int:
parser = argparse.ArgumentParser(prog="run_real_vm_demo")
parser.add_argument("--data-root", default="data")
parser.add_argument("--interval-ms", type=int, default=100)
parser.add_argument(
"--run-dir",
# Per-slot defaults so the fleet runner's parallel calls don't
# collide on the same /tmp dir (which would have rmtree'd each
# other's pidfiles mid-boot — see CIS490 history). Resolution
# order:
# 1) explicit --run-dir CLI flag
# 2) RUN_DIR env (set by the fleet runner)
# 3) /tmp/cis490-vm-<SLOT> (SLOT defaults to 0)
default=(
os.environ.get("RUN_DIR")
or f"/tmp/cis490-vm-{os.environ.get('SLOT', '0')}"
),
help="QEMU run dir (sockets + pidfile go here)",
)
parser.add_argument(
"--keep-vm",
action="store_true",
help="leave the VM running after the episode finishes",
)
parser.add_argument(
"--boot-timeout",
type=float,
default=120.0,
help="how long to wait for serial login prompt",
)
parser.add_argument(
"--sample",
default=os.environ.get("SAMPLE_NAME"),
help="Pick a workload profile from the samples manifest by name. "
"Fleet runner passes this via SAMPLE_NAME env. If unset, runs the "
"v1 yes-loop.",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
log = logging.getLogger("cis490.run_real_vm_demo")
repo_root = Path(__file__).resolve().parent.parent
launcher = repo_root / "vm" / "launch_demo.sh"
# Canonical experiment manifest (PIPELINE.md §4.1). The samples-manifest
# path comes from here too; no per-call override.
try:
experiment = load_canonical(repo_root)
except ManifestError as e:
log.error("canonical manifest failed to load: %s", e)
return 78
samples_path = repo_root / experiment.samples_manifest_path
# Resolve sample if requested.
sample = None
if args.sample:
samples = SampleManifest.load(samples_path)
sample = next((s for s in samples.samples if s.name == args.sample), None)
if sample is None:
log.error("sample %r not in samples manifest %s",
args.sample, samples_path)
return 2
log.info("using sample=%s profile=%s kind=%s",
sample.name, sample.profile, sample.kind)
run_dir = Path(args.run_dir)
# Wipe any stale sockets/pidfile from a previous run.
if run_dir.exists():
import shutil
shutil.rmtree(run_dir)
run_dir.mkdir(parents=True, exist_ok=True)
serial_sock = run_dir / "serial.sock"
pid_file = run_dir / "qemu.pid"
log.info("booting VM via %s (RUN_DIR=%s)", launcher, run_dir)
env = os.environ.copy()
env["RUN_DIR"] = str(run_dir)
qemu = subprocess.Popen(
[str(launcher)],
cwd=str(repo_root),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
try:
_wait_for_socket(serial_sock, timeout_s=15.0)
# Wait for the pid file to be non-empty.
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
if pid_file.exists() and pid_file.read_text().strip():
break
time.sleep(0.2)
qemu_pid = int(pid_file.read_text().strip())
log.info("qemu pid = %d", qemu_pid)
# Cloud-init's runcmd (password setup, sshd hardening) needs some
# time after boot. Wait long enough that the credentials we'll
# send are actually valid.
log.info("waiting 35s for cloud-init runcmd to settle...")
time.sleep(35.0)
log.info("connecting serial + logging in (boot timeout %.0fs)",
args.boot_timeout)
serial = SerialClient(str(serial_sock))
serial.connect()
serial.login(boot_timeout_s=args.boot_timeout)
# Take a savevm AFTER the guest is fully up but BEFORE we
# start any workload. EpisodeConfig.revert_at_{start,end} use
# this snapshot for inter-episode reverts (the snapshot lives
# in the qcow2's per-VM-process overlay since launch_demo.sh
# runs with snapshot=on, so it's discarded with the VM).
# Without this step, loadvm would target a snapshot that
# doesn't exist and silently emit snapshot_revert_failed.
qmp_sock = run_dir / "qmp.sock"
if qmp_sock.exists():
try:
_qmp = qmp.QMPClient(qmp_sock)
_qmp.connect()
try:
out = _qmp.savevm("baseline-v1")
log.info("savevm baseline-v1 OK: %s", out.strip()[:160])
finally:
_qmp.close()
except Exception as e:
log.warning("savevm failed; revert_at_start unusable: %s", e)
# Bind the controller to the runner's event log so workload
# success/failure shows up alongside phase_transition events.
# Sample also goes into EpisodeConfig below so meta.sample
# records what was supposed to run.
runner_for_emit = {"runner": None}
controller = VMLoadController(
serial,
sample=sample,
emit_event=lambda ev, **kw: (
runner_for_emit["runner"].emit_event(ev, **kw)
if runner_for_emit["runner"] else None
),
)
controller.setup()
agent_sock = run_dir / "agent.sock"
schedule = tier2_schedule_from(experiment.schedule)
cfg = EpisodeConfig(
target_pid=qemu_pid,
duration_s=sum(d for _, d in schedule),
interval_ms=args.interval_ms,
data_root=Path(args.data_root),
phase_schedule=schedule,
image_name="alpine-3.21-cloudinit",
snapshot_name="baseline-v1",
qmp_socket=qmp_sock if qmp_sock.exists() else None,
guest_agent_socket=agent_sock if agent_sock.exists() else None,
bridge_iface=os.environ.get("BRIDGE") or None,
experiment_meta=experiment.to_meta(),
# Source 3 (oracle): perf-stat sampling of the qemu PID.
# Now that the stdout/stderr + event-name parser bugs are
# fixed, perf produces real rows. Per PIPELINE.md §4.4
# collectors that emit zero rows shouldn't be in the active
# set silently — keeping it default-off was effectively
# silencing the source. On x86_64 lab hosts with
# perf_event_paranoid <= 2 (cis490 user owns qemu PID), the
# collector reads cycles/instructions/page-faults; on hosts
# without perf the collector logs a warning and returns 0
# rows (visible, not silent).
enable_perf=True,
sample=sample,
)
runner = EpisodeRunner(cfg, on_phase=controller.set_phase)
# Connect the controller's event sink to the runner now that
# both exist. (Forward-reference closure pattern keeps the
# constructor argument order natural.)
runner_for_emit["runner"] = runner
result = runner.run()
controller.teardown()
serial.close()
print()
print(f"episode_id = {result.episode_id}")
print(f"path = {result.episode_dir}")
print(f"rows_proc = {result.rows_proc}")
print(f"phases = {result.phases_observed}")
print()
print("To plot:")
print(f" uv run python tools/plot_envelope.py {result.episode_dir}")
return 0
finally:
if not args.keep_vm:
log.info("shutting down VM (pid=%d)", qemu.pid)
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
qemu.wait(timeout=5)
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
if __name__ == "__main__":
sys.exit(main())