CIS490/tools/run_campaign.py
Elliott Kolden 842918556b Add automated campaign runner, shipper, and systemd units
Implements the unattended episode loop described in docs/deploy.md but not
yet built. run_campaign.py boots a fresh VM per episode, drives the full
phase schedule via the existing EpisodeRunner/VMLoadController stack, writes
campaign.json atomically after each episode, and signals completion with
campaign_done.marker. shipper.py watches data/episodes/ for done.marker
files, tar+zstd-compresses each, and PUTs them to the receiver with
exponential backoff on failure. Both support SIGTERM gracefully, finishing
the current episode/scan before exiting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 14:53:40 -06:00

277 lines
8.3 KiB
Python

"""Campaign runner — drives N back-to-back VM episodes unattended.
Each iteration boots a fresh VM from the baseline snapshot, runs the standard
phase schedule, shuts the VM down, and updates campaign.json. On reaching
the target, campaign_done.marker is written and the process exits 0.
Safe to interrupt with SIGTERM/SIGINT: state is flushed atomically after
every episode, so re-running with the same --data-root resumes where the
previous run left off.
Usage:
uv run python tools/run_campaign.py --target 50 --data-root data
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import shutil
import signal
import socket as _sk
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent))
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
from vm_load_controller import VMLoadController # noqa: E402
from vm_serial import SerialClient # noqa: E402
PHASE_SCHEDULE = [
("clean", 10.0),
("armed", 2.0),
("infecting", 3.0),
("infected_running", 25.0),
("dormant", 15.0),
("infected_running", 20.0),
("dormant", 5.0),
("clean", 5.0),
]
SCHEMA_VERSION = 1
RETRY_DELAY_S = 5.0
log = logging.getLogger("cis490.campaign")
# ---------------------------------------------------------------------------
# campaign.json helpers
# ---------------------------------------------------------------------------
def _load_state(path: Path, target: int) -> dict:
if path.exists():
state = json.loads(path.read_text())
# Allow raising the target on resume; never lower it.
if target > state["target"]:
state["target"] = target
return state
return {
"schema_version": SCHEMA_VERSION,
"target": target,
"completed": 0,
"done": False,
"started_at": datetime.now(timezone.utc).isoformat(),
"finished_at": None,
"last_episode_id": None,
"last_completed_at": None,
}
def _save_state(path: Path, state: dict) -> None:
tmp = path.with_suffix(".json.partial")
tmp.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n")
os.replace(tmp, path)
# ---------------------------------------------------------------------------
# single-episode VM boot → run → shutdown
# ---------------------------------------------------------------------------
def _wait_for_socket(path: Path, timeout_s: float) -> None:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if path.exists():
try:
t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM)
t.settimeout(0.5)
t.connect(str(path))
t.close()
return
except OSError:
pass
time.sleep(0.2)
raise TimeoutError(f"socket {path} not ready after {timeout_s}s")
def _run_one_episode(
repo_root: Path,
data_root: Path,
run_dir: Path,
interval_ms: int,
boot_timeout_s: float,
) -> str:
launcher = repo_root / "vm" / "launch_demo.sh"
if run_dir.exists():
shutil.rmtree(run_dir)
run_dir.mkdir(parents=True, exist_ok=True)
serial_sock = run_dir / "serial.sock"
pid_file = run_dir / "qemu.pid"
env = os.environ.copy()
env["RUN_DIR"] = str(run_dir)
qemu = subprocess.Popen(
[str(launcher)],
cwd=str(repo_root),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
try:
_wait_for_socket(serial_sock, timeout_s=15.0)
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
if pid_file.exists() and pid_file.read_text().strip():
break
time.sleep(0.2)
qemu_pid = int(pid_file.read_text().strip())
log.info("qemu pid=%d", qemu_pid)
log.info("waiting 35s for cloud-init to settle...")
time.sleep(35.0)
serial = SerialClient(str(serial_sock))
serial.connect()
serial.login(boot_timeout_s=boot_timeout_s)
controller = VMLoadController(serial)
controller.setup()
cfg = EpisodeConfig(
target_pid=qemu_pid,
duration_s=sum(d for _, d in PHASE_SCHEDULE),
interval_ms=interval_ms,
data_root=data_root,
phase_schedule=PHASE_SCHEDULE,
image_name="alpine-3.21-cloudinit",
snapshot_name="baseline-v1",
)
result = EpisodeRunner(cfg, on_phase=controller.set_phase).run()
controller.teardown()
serial.close()
return result.episode_id
finally:
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
qemu.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
except ProcessLookupError:
pass
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(prog="run_campaign")
parser.add_argument(
"--target", type=int, required=True,
help="total number of episodes to collect",
)
parser.add_argument("--data-root", default="data")
parser.add_argument("--interval-ms", type=int, default=100)
parser.add_argument("--run-dir", default="/tmp/cis490-vm")
parser.add_argument("--boot-timeout", type=float, default=120.0)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
data_root = Path(args.data_root)
data_root.mkdir(parents=True, exist_ok=True)
campaign_path = data_root / "campaign.json"
done_marker = data_root / "campaign_done.marker"
state = _load_state(campaign_path, args.target)
if state["done"]:
log.info(
"campaign already complete (%d/%d). Remove %s to start a new one.",
state["completed"], state["target"], campaign_path,
)
return 0
repo_root = Path(__file__).resolve().parent.parent
run_dir = Path(args.run_dir)
stop = threading.Event()
def _handle_signal(sig, _frame):
log.info("signal %d — finishing current episode then stopping", sig)
stop.set()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
log.info(
"campaign started: %d/%d episodes already collected",
state["completed"], state["target"],
)
while state["completed"] < state["target"] and not stop.is_set():
ep_num = state["completed"] + 1
log.info("episode %d/%d — booting VM", ep_num, state["target"])
try:
episode_id = _run_one_episode(
repo_root=repo_root,
data_root=data_root,
run_dir=run_dir,
interval_ms=args.interval_ms,
boot_timeout_s=args.boot_timeout,
)
except Exception:
log.exception("episode %d failed — retrying in %.0fs", ep_num, RETRY_DELAY_S)
time.sleep(RETRY_DELAY_S)
continue
state["completed"] += 1
state["last_episode_id"] = episode_id
state["last_completed_at"] = datetime.now(timezone.utc).isoformat()
_save_state(campaign_path, state)
log.info(
"episode %d/%d complete (id=%s)",
state["completed"], state["target"], episode_id,
)
if state["completed"] >= state["target"]:
state["done"] = True
state["finished_at"] = datetime.now(timezone.utc).isoformat()
_save_state(campaign_path, state)
done_marker.touch()
log.info(
"CAMPAIGN COMPLETE — %d episodes in %s",
state["completed"], data_root,
)
return 0
log.info(
"campaign paused at %d/%d — re-run to continue",
state["completed"], state["target"],
)
return 0
if __name__ == "__main__":
sys.exit(main())