Two correctness fixes that the §4.5 event-driven labeller surfaced:
1. tools/run_real_vm_demo.py was hardcoding a Tier-3-shaped schedule
(clean → armed → infecting → infected_running → ...) for episodes
with no exploit firing. Pre-§4.5 those episodes wrote dishonest
`infected_running` labels from the schedule clock — exactly the §3
evidence pattern. Post-§4.5 they write `failed` at the infecting
transition (the justifying exploit_fire never arrives), which is
honest about what happened but useless for training.
The honest fix: Tier-2 episodes have a clean-only schedule. All
telemetry tagged `clean` because nothing infected anything. The
total duration matches the canonical Tier-3 schedule so episode
lengths are comparable across tiers — no length-bias in the
dataset (§10).
Helper `tier2_schedule_from(schedule)` in orchestrator/manifest.py
derives `[("clean", total_seconds)]` from the canonical schedule.
`tier3_schedule_from(schedule)` renders the legacy
`[(name, seconds)]` shape EpisodeConfig still expects.
Tier-2 demo (run_real_vm_demo.py) now calls tier2_schedule_from.
Tier-3 demo (run_tier3_demo.py) now calls tier3_schedule_from.
Drops the hardcoded DEFAULT_SCHEDULE constants from both — the
canonical manifest is the single source of truth (§4.1).
2. .gitignore now excludes /VERSION. The install-lab-host.sh stamp
writes /opt/cis490/VERSION so episodes can record code provenance
without /opt/cis490 carrying a .git directory. But /opt/cis490 IS
typically a git checkout on lab hosts (auto-update.sh pulls into
it), so writing VERSION leaves the working tree dirty. Every
episode's meta.code_version.dirty=true. PIPELINE.md §4.6 acceptance
gate's rule 4 would then reject every episode without
CIS490_ALLOW_DIRTY=1 set — which would break the data flow.
Now VERSION is .gitignored: install-lab-host.sh stamps it, git
status doesn't see it, dirty=false, gate rule 4 passes naturally.
These two changes together keep the data flowing AND honest. Tier-2
episodes pass with `phases=[clean]` + every collector emitting real
rows. Tier-3 episodes (none today, empty catalog) walk the full
event-driven schedule when a verified module gets re-admitted.
286 tests passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""Tier-2: real VM, real workload, labeled phases.
|
|
|
|
Boots the Alpine cidata VM, logs in over the serial console, drives the
|
|
guest through an XMRig-shaped phase schedule (clean → armed → infecting →
|
|
infected_running → dormant → re-entry), and lets the orchestrator's host
|
|
/proc collector sample the qemu-system pid throughout.
|
|
|
|
Compared to ``run_envelope_demo.py``: same phase schedule, same labels,
|
|
same telemetry shape — but the load is now generated by ``yes`` and
|
|
``dd`` running *inside* a real Alpine guest, not by a Python program on
|
|
the host. Tier-3 replaces the controller with an MSF-driven exploit
|
|
fire.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Allow running as a script.
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
|
|
|
from collectors import qmp # noqa: E402
|
|
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
|
|
from orchestrator.manifest import ( # noqa: E402
|
|
ManifestError, load_canonical, tier2_schedule_from,
|
|
)
|
|
from samples.manifest import SampleManifest # noqa: E402
|
|
from vm_load_controller import VMLoadController # noqa: E402
|
|
from vm_serial import SerialClient # noqa: E402
|
|
|
|
|
|
# Tier-2 episodes have no exploit firing — their schedule is derived
|
|
# from the canonical Tier-3 schedule total duration (PIPELINE.md §4.1
|
|
# canonical manifest, §4.5 event-driven labeller, §10 honest labels).
|
|
# `tier2_schedule_from(experiment.schedule)` produces a single `clean`
|
|
# phase for the same wall-clock as a Tier-3 walk; that keeps episode
|
|
# lengths comparable across tiers without minting `infected_running`
|
|
# labels for episodes where nothing infected anything.
|
|
|
|
|
|
def _wait_for_socket(path: Path, timeout_s: float) -> None:
|
|
import socket as _sk
|
|
deadline = time.monotonic() + timeout_s
|
|
while time.monotonic() < deadline:
|
|
if path.exists():
|
|
try:
|
|
# Verify it's actually live, not a leftover from a dead VM.
|
|
t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM)
|
|
t.settimeout(0.5)
|
|
t.connect(str(path))
|
|
t.close()
|
|
return
|
|
except OSError:
|
|
pass
|
|
time.sleep(0.2)
|
|
raise TimeoutError(f"socket {path} never came alive within {timeout_s}s")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(prog="run_real_vm_demo")
|
|
parser.add_argument("--data-root", default="data")
|
|
parser.add_argument("--interval-ms", type=int, default=100)
|
|
parser.add_argument(
|
|
"--run-dir",
|
|
# Per-slot defaults so the fleet runner's parallel calls don't
|
|
# collide on the same /tmp dir (which would have rmtree'd each
|
|
# other's pidfiles mid-boot — see CIS490 history). Resolution
|
|
# order:
|
|
# 1) explicit --run-dir CLI flag
|
|
# 2) RUN_DIR env (set by the fleet runner)
|
|
# 3) /tmp/cis490-vm-<SLOT> (SLOT defaults to 0)
|
|
default=(
|
|
os.environ.get("RUN_DIR")
|
|
or f"/tmp/cis490-vm-{os.environ.get('SLOT', '0')}"
|
|
),
|
|
help="QEMU run dir (sockets + pidfile go here)",
|
|
)
|
|
parser.add_argument(
|
|
"--keep-vm",
|
|
action="store_true",
|
|
help="leave the VM running after the episode finishes",
|
|
)
|
|
parser.add_argument(
|
|
"--boot-timeout",
|
|
type=float,
|
|
default=120.0,
|
|
help="how long to wait for serial login prompt",
|
|
)
|
|
parser.add_argument(
|
|
"--sample",
|
|
default=os.environ.get("SAMPLE_NAME"),
|
|
help="Pick a workload profile from the samples manifest by name. "
|
|
"Fleet runner passes this via SAMPLE_NAME env. If unset, runs the "
|
|
"v1 yes-loop.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
log = logging.getLogger("cis490.run_real_vm_demo")
|
|
|
|
repo_root = Path(__file__).resolve().parent.parent
|
|
launcher = repo_root / "vm" / "launch_demo.sh"
|
|
|
|
# Canonical experiment manifest (PIPELINE.md §4.1). The samples-manifest
|
|
# path comes from here too; no per-call override.
|
|
try:
|
|
experiment = load_canonical(repo_root)
|
|
except ManifestError as e:
|
|
log.error("canonical manifest failed to load: %s", e)
|
|
return 78
|
|
samples_path = repo_root / experiment.samples_manifest_path
|
|
|
|
# Resolve sample if requested.
|
|
sample = None
|
|
if args.sample:
|
|
samples = SampleManifest.load(samples_path)
|
|
sample = next((s for s in samples.samples if s.name == args.sample), None)
|
|
if sample is None:
|
|
log.error("sample %r not in samples manifest %s",
|
|
args.sample, samples_path)
|
|
return 2
|
|
log.info("using sample=%s profile=%s kind=%s",
|
|
sample.name, sample.profile, sample.kind)
|
|
run_dir = Path(args.run_dir)
|
|
# Wipe any stale sockets/pidfile from a previous run.
|
|
if run_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(run_dir)
|
|
run_dir.mkdir(parents=True, exist_ok=True)
|
|
serial_sock = run_dir / "serial.sock"
|
|
pid_file = run_dir / "qemu.pid"
|
|
|
|
log.info("booting VM via %s (RUN_DIR=%s)", launcher, run_dir)
|
|
env = os.environ.copy()
|
|
env["RUN_DIR"] = str(run_dir)
|
|
qemu = subprocess.Popen(
|
|
[str(launcher)],
|
|
cwd=str(repo_root),
|
|
env=env,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
start_new_session=True,
|
|
)
|
|
|
|
try:
|
|
_wait_for_socket(serial_sock, timeout_s=15.0)
|
|
# Wait for the pid file to be non-empty.
|
|
deadline = time.monotonic() + 15.0
|
|
while time.monotonic() < deadline:
|
|
if pid_file.exists() and pid_file.read_text().strip():
|
|
break
|
|
time.sleep(0.2)
|
|
qemu_pid = int(pid_file.read_text().strip())
|
|
log.info("qemu pid = %d", qemu_pid)
|
|
|
|
# Cloud-init's runcmd (password setup, sshd hardening) needs some
|
|
# time after boot. Wait long enough that the credentials we'll
|
|
# send are actually valid.
|
|
log.info("waiting 35s for cloud-init runcmd to settle...")
|
|
time.sleep(35.0)
|
|
|
|
log.info("connecting serial + logging in (boot timeout %.0fs)",
|
|
args.boot_timeout)
|
|
serial = SerialClient(str(serial_sock))
|
|
serial.connect()
|
|
serial.login(boot_timeout_s=args.boot_timeout)
|
|
|
|
# Take a savevm AFTER the guest is fully up but BEFORE we
|
|
# start any workload. EpisodeConfig.revert_at_{start,end} use
|
|
# this snapshot for inter-episode reverts (the snapshot lives
|
|
# in the qcow2's per-VM-process overlay since launch_demo.sh
|
|
# runs with snapshot=on, so it's discarded with the VM).
|
|
# Without this step, loadvm would target a snapshot that
|
|
# doesn't exist and silently emit snapshot_revert_failed.
|
|
qmp_sock = run_dir / "qmp.sock"
|
|
if qmp_sock.exists():
|
|
try:
|
|
_qmp = qmp.QMPClient(qmp_sock)
|
|
_qmp.connect()
|
|
try:
|
|
out = _qmp.savevm("baseline-v1")
|
|
log.info("savevm baseline-v1 OK: %s", out.strip()[:160])
|
|
finally:
|
|
_qmp.close()
|
|
except Exception as e:
|
|
log.warning("savevm failed; revert_at_start unusable: %s", e)
|
|
|
|
# Bind the controller to the runner's event log so workload
|
|
# success/failure shows up alongside phase_transition events.
|
|
# Sample also goes into EpisodeConfig below so meta.sample
|
|
# records what was supposed to run.
|
|
runner_for_emit = {"runner": None}
|
|
controller = VMLoadController(
|
|
serial,
|
|
sample=sample,
|
|
emit_event=lambda ev, **kw: (
|
|
runner_for_emit["runner"].emit_event(ev, **kw)
|
|
if runner_for_emit["runner"] else None
|
|
),
|
|
)
|
|
controller.setup()
|
|
|
|
agent_sock = run_dir / "agent.sock"
|
|
schedule = tier2_schedule_from(experiment.schedule)
|
|
cfg = EpisodeConfig(
|
|
target_pid=qemu_pid,
|
|
duration_s=sum(d for _, d in schedule),
|
|
interval_ms=args.interval_ms,
|
|
data_root=Path(args.data_root),
|
|
phase_schedule=schedule,
|
|
image_name="alpine-3.21-cloudinit",
|
|
snapshot_name="baseline-v1",
|
|
qmp_socket=qmp_sock if qmp_sock.exists() else None,
|
|
guest_agent_socket=agent_sock if agent_sock.exists() else None,
|
|
bridge_iface=os.environ.get("BRIDGE") or None,
|
|
experiment_meta=experiment.to_meta(),
|
|
# Source 3 (oracle): perf-stat sampling of the qemu PID.
|
|
# Now that the stdout/stderr + event-name parser bugs are
|
|
# fixed, perf produces real rows. Per PIPELINE.md §4.4
|
|
# collectors that emit zero rows shouldn't be in the active
|
|
# set silently — keeping it default-off was effectively
|
|
# silencing the source. On x86_64 lab hosts with
|
|
# perf_event_paranoid <= 2 (cis490 user owns qemu PID), the
|
|
# collector reads cycles/instructions/page-faults; on hosts
|
|
# without perf the collector logs a warning and returns 0
|
|
# rows (visible, not silent).
|
|
enable_perf=True,
|
|
sample=sample,
|
|
)
|
|
|
|
runner = EpisodeRunner(cfg, on_phase=controller.set_phase)
|
|
# Connect the controller's event sink to the runner now that
|
|
# both exist. (Forward-reference closure pattern keeps the
|
|
# constructor argument order natural.)
|
|
runner_for_emit["runner"] = runner
|
|
result = runner.run()
|
|
|
|
controller.teardown()
|
|
serial.close()
|
|
|
|
print()
|
|
print(f"episode_id = {result.episode_id}")
|
|
print(f"path = {result.episode_dir}")
|
|
print(f"rows_proc = {result.rows_proc}")
|
|
print(f"phases = {result.phases_observed}")
|
|
print()
|
|
print("To plot:")
|
|
print(f" uv run python tools/plot_envelope.py {result.episode_dir}")
|
|
return 0
|
|
finally:
|
|
if not args.keep_vm:
|
|
log.info("shutting down VM (pid=%d)", qemu.pid)
|
|
try:
|
|
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
qemu.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|