"""Campaign runner — drives N back-to-back VM episodes unattended. Each iteration boots a fresh VM from the baseline snapshot, runs the standard phase schedule, shuts the VM down, and updates campaign.json. On reaching the target, campaign_done.marker is written and the process exits 0. Safe to interrupt with SIGTERM/SIGINT: state is flushed atomically after every episode, so re-running with the same --data-root resumes where the previous run left off. Usage: uv run python tools/run_campaign.py --target 50 --data-root data """ from __future__ import annotations import argparse import json import logging import os import shutil import signal import socket as _sk import subprocess import sys import threading import time from datetime import datetime, timezone from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) sys.path.insert(0, str(Path(__file__).resolve().parent)) from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402 from vm_load_controller import VMLoadController # noqa: E402 from vm_serial import SerialClient # noqa: E402 PHASE_SCHEDULE = [ ("clean", 10.0), ("armed", 2.0), ("infecting", 3.0), ("infected_running", 25.0), ("dormant", 15.0), ("infected_running", 20.0), ("dormant", 5.0), ("clean", 5.0), ] SCHEMA_VERSION = 1 RETRY_DELAY_S = 5.0 log = logging.getLogger("cis490.campaign") # --------------------------------------------------------------------------- # campaign.json helpers # --------------------------------------------------------------------------- def _load_state(path: Path, target: int) -> dict: if path.exists(): state = json.loads(path.read_text()) # Allow raising the target on resume; never lower it. if target > state["target"]: state["target"] = target return state return { "schema_version": SCHEMA_VERSION, "target": target, "completed": 0, "done": False, "started_at": datetime.now(timezone.utc).isoformat(), "finished_at": None, "last_episode_id": None, "last_completed_at": None, } def _save_state(path: Path, state: dict) -> None: tmp = path.with_suffix(".json.partial") tmp.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n") os.replace(tmp, path) # --------------------------------------------------------------------------- # single-episode VM boot → run → shutdown # --------------------------------------------------------------------------- def _wait_for_socket(path: Path, timeout_s: float) -> None: deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: if path.exists(): try: t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM) t.settimeout(0.5) t.connect(str(path)) t.close() return except OSError: pass time.sleep(0.2) raise TimeoutError(f"socket {path} not ready after {timeout_s}s") def _run_one_episode( repo_root: Path, data_root: Path, run_dir: Path, interval_ms: int, boot_timeout_s: float, ) -> str: launcher = repo_root / "vm" / "launch_demo.sh" if run_dir.exists(): shutil.rmtree(run_dir) run_dir.mkdir(parents=True, exist_ok=True) serial_sock = run_dir / "serial.sock" pid_file = run_dir / "qemu.pid" env = os.environ.copy() env["RUN_DIR"] = str(run_dir) qemu = subprocess.Popen( [str(launcher)], cwd=str(repo_root), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) try: _wait_for_socket(serial_sock, timeout_s=15.0) deadline = time.monotonic() + 15.0 while time.monotonic() < deadline: if pid_file.exists() and pid_file.read_text().strip(): break time.sleep(0.2) qemu_pid = int(pid_file.read_text().strip()) log.info("qemu pid=%d", qemu_pid) log.info("waiting 35s for cloud-init to settle...") time.sleep(35.0) serial = SerialClient(str(serial_sock)) serial.connect() serial.login(boot_timeout_s=boot_timeout_s) controller = VMLoadController(serial) controller.setup() cfg = EpisodeConfig( target_pid=qemu_pid, duration_s=sum(d for _, d in PHASE_SCHEDULE), interval_ms=interval_ms, data_root=data_root, phase_schedule=PHASE_SCHEDULE, image_name="alpine-3.21-cloudinit", snapshot_name="baseline-v1", ) result = EpisodeRunner(cfg, on_phase=controller.set_phase).run() controller.teardown() serial.close() return result.episode_id finally: try: os.killpg(os.getpgid(qemu.pid), signal.SIGTERM) except ProcessLookupError: pass try: qemu.wait(timeout=5) except subprocess.TimeoutExpired: try: os.killpg(os.getpgid(qemu.pid), signal.SIGKILL) except ProcessLookupError: pass # --------------------------------------------------------------------------- # main # --------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(prog="run_campaign") parser.add_argument( "--target", type=int, required=True, help="total number of episodes to collect", ) parser.add_argument("--data-root", default="data") parser.add_argument("--interval-ms", type=int, default=100) parser.add_argument("--run-dir", default="/tmp/cis490-vm") parser.add_argument("--boot-timeout", type=float, default=120.0) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) data_root = Path(args.data_root) data_root.mkdir(parents=True, exist_ok=True) campaign_path = data_root / "campaign.json" done_marker = data_root / "campaign_done.marker" state = _load_state(campaign_path, args.target) if state["done"]: log.info( "campaign already complete (%d/%d). Remove %s to start a new one.", state["completed"], state["target"], campaign_path, ) return 0 repo_root = Path(__file__).resolve().parent.parent run_dir = Path(args.run_dir) stop = threading.Event() def _handle_signal(sig, _frame): log.info("signal %d — finishing current episode then stopping", sig) stop.set() signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) log.info( "campaign started: %d/%d episodes already collected", state["completed"], state["target"], ) while state["completed"] < state["target"] and not stop.is_set(): ep_num = state["completed"] + 1 log.info("episode %d/%d — booting VM", ep_num, state["target"]) try: episode_id = _run_one_episode( repo_root=repo_root, data_root=data_root, run_dir=run_dir, interval_ms=args.interval_ms, boot_timeout_s=args.boot_timeout, ) except Exception: log.exception("episode %d failed — retrying in %.0fs", ep_num, RETRY_DELAY_S) time.sleep(RETRY_DELAY_S) continue state["completed"] += 1 state["last_episode_id"] = episode_id state["last_completed_at"] = datetime.now(timezone.utc).isoformat() _save_state(campaign_path, state) log.info( "episode %d/%d complete (id=%s)", state["completed"], state["target"], episode_id, ) if state["completed"] >= state["target"]: state["done"] = True state["finished_at"] = datetime.now(timezone.utc).isoformat() _save_state(campaign_path, state) done_marker.touch() log.info( "CAMPAIGN COMPLETE — %d episodes in %s", state["completed"], data_root, ) return 0 log.info( "campaign paused at %d/%d — re-run to continue", state["completed"], state["target"], ) return 0 if __name__ == "__main__": sys.exit(main())