Add automated campaign runner, shipper, and systemd units

Implements the unattended episode loop described in docs/deploy.md but not
yet built. run_campaign.py boots a fresh VM per episode, drives the full
phase schedule via the existing EpisodeRunner/VMLoadController stack, writes
campaign.json atomically after each episode, and signals completion with
campaign_done.marker. shipper.py watches data/episodes/ for done.marker
files, tar+zstd-compresses each, and PUTs them to the receiver with
exponential backoff on failure. Both support SIGTERM gracefully, finishing
the current episode/scan before exiting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Elliott Kolden 2026-04-29 13:53:00 -06:00
parent 7216ec09bd
commit 842918556b
4 changed files with 589 additions and 0 deletions

View file

@ -0,0 +1,33 @@
[Unit]
Description=CIS490 episode campaign runner
Documentation=https://maxgit.wg/spectral/CIS490
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=cis490
Group=cis490
SupplementaryGroups=kvm
WorkingDirectory=/opt/cis490
ExecStart=/opt/cis490/.venv/bin/python tools/run_campaign.py \
--data-root /var/lib/cis490/data \
--target 100
Restart=on-failure
RestartSec=10
# Hardening
NoNewPrivileges=true
PrivateTmp=false
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/cis490 /tmp/cis490-vm /dev/kvm
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
LockPersonality=true
RestrictRealtime=true
SystemCallArchitectures=native
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,36 @@
[Unit]
Description=CIS490 episode shipper
Documentation=https://maxgit.wg/spectral/CIS490
After=network-online.target cis490-orchestrator.service
Wants=network-online.target
[Service]
Type=simple
User=cis490
Group=cis490
WorkingDirectory=/opt/cis490
ExecStart=/opt/cis490/.venv/bin/python tools/shipper.py \
--data-root /var/lib/cis490/data \
--receiver-url https://collector.wg \
--host-id lab-host-1 \
--ca-bundle /etc/cis490/certs/wg-ca.pem \
--client-cert /etc/cis490/certs/lab-host-1.pem \
--client-key /etc/cis490/certs/lab-host-1.key
Restart=on-failure
RestartSec=10
# Hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/cis490
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
LockPersonality=true
RestrictRealtime=true
SystemCallArchitectures=native
[Install]
WantedBy=multi-user.target

277
tools/run_campaign.py Normal file
View file

@ -0,0 +1,277 @@
"""Campaign runner — drives N back-to-back VM episodes unattended.
Each iteration boots a fresh VM from the baseline snapshot, runs the standard
phase schedule, shuts the VM down, and updates campaign.json. On reaching
the target, campaign_done.marker is written and the process exits 0.
Safe to interrupt with SIGTERM/SIGINT: state is flushed atomically after
every episode, so re-running with the same --data-root resumes where the
previous run left off.
Usage:
uv run python tools/run_campaign.py --target 50 --data-root data
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import shutil
import signal
import socket as _sk
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent))
from orchestrator.episode import EpisodeConfig, EpisodeRunner # noqa: E402
from vm_load_controller import VMLoadController # noqa: E402
from vm_serial import SerialClient # noqa: E402
PHASE_SCHEDULE = [
("clean", 10.0),
("armed", 2.0),
("infecting", 3.0),
("infected_running", 25.0),
("dormant", 15.0),
("infected_running", 20.0),
("dormant", 5.0),
("clean", 5.0),
]
SCHEMA_VERSION = 1
RETRY_DELAY_S = 5.0
log = logging.getLogger("cis490.campaign")
# ---------------------------------------------------------------------------
# campaign.json helpers
# ---------------------------------------------------------------------------
def _load_state(path: Path, target: int) -> dict:
if path.exists():
state = json.loads(path.read_text())
# Allow raising the target on resume; never lower it.
if target > state["target"]:
state["target"] = target
return state
return {
"schema_version": SCHEMA_VERSION,
"target": target,
"completed": 0,
"done": False,
"started_at": datetime.now(timezone.utc).isoformat(),
"finished_at": None,
"last_episode_id": None,
"last_completed_at": None,
}
def _save_state(path: Path, state: dict) -> None:
tmp = path.with_suffix(".json.partial")
tmp.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n")
os.replace(tmp, path)
# ---------------------------------------------------------------------------
# single-episode VM boot → run → shutdown
# ---------------------------------------------------------------------------
def _wait_for_socket(path: Path, timeout_s: float) -> None:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if path.exists():
try:
t = _sk.socket(_sk.AF_UNIX, _sk.SOCK_STREAM)
t.settimeout(0.5)
t.connect(str(path))
t.close()
return
except OSError:
pass
time.sleep(0.2)
raise TimeoutError(f"socket {path} not ready after {timeout_s}s")
def _run_one_episode(
repo_root: Path,
data_root: Path,
run_dir: Path,
interval_ms: int,
boot_timeout_s: float,
) -> str:
launcher = repo_root / "vm" / "launch_demo.sh"
if run_dir.exists():
shutil.rmtree(run_dir)
run_dir.mkdir(parents=True, exist_ok=True)
serial_sock = run_dir / "serial.sock"
pid_file = run_dir / "qemu.pid"
env = os.environ.copy()
env["RUN_DIR"] = str(run_dir)
qemu = subprocess.Popen(
[str(launcher)],
cwd=str(repo_root),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
try:
_wait_for_socket(serial_sock, timeout_s=15.0)
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
if pid_file.exists() and pid_file.read_text().strip():
break
time.sleep(0.2)
qemu_pid = int(pid_file.read_text().strip())
log.info("qemu pid=%d", qemu_pid)
log.info("waiting 35s for cloud-init to settle...")
time.sleep(35.0)
serial = SerialClient(str(serial_sock))
serial.connect()
serial.login(boot_timeout_s=boot_timeout_s)
controller = VMLoadController(serial)
controller.setup()
cfg = EpisodeConfig(
target_pid=qemu_pid,
duration_s=sum(d for _, d in PHASE_SCHEDULE),
interval_ms=interval_ms,
data_root=data_root,
phase_schedule=PHASE_SCHEDULE,
image_name="alpine-3.21-cloudinit",
snapshot_name="baseline-v1",
)
result = EpisodeRunner(cfg, on_phase=controller.set_phase).run()
controller.teardown()
serial.close()
return result.episode_id
finally:
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
qemu.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(qemu.pid), signal.SIGKILL)
except ProcessLookupError:
pass
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(prog="run_campaign")
parser.add_argument(
"--target", type=int, required=True,
help="total number of episodes to collect",
)
parser.add_argument("--data-root", default="data")
parser.add_argument("--interval-ms", type=int, default=100)
parser.add_argument("--run-dir", default="/tmp/cis490-vm")
parser.add_argument("--boot-timeout", type=float, default=120.0)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
data_root = Path(args.data_root)
data_root.mkdir(parents=True, exist_ok=True)
campaign_path = data_root / "campaign.json"
done_marker = data_root / "campaign_done.marker"
state = _load_state(campaign_path, args.target)
if state["done"]:
log.info(
"campaign already complete (%d/%d). Remove %s to start a new one.",
state["completed"], state["target"], campaign_path,
)
return 0
repo_root = Path(__file__).resolve().parent.parent
run_dir = Path(args.run_dir)
stop = threading.Event()
def _handle_signal(sig, _frame):
log.info("signal %d — finishing current episode then stopping", sig)
stop.set()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
log.info(
"campaign started: %d/%d episodes already collected",
state["completed"], state["target"],
)
while state["completed"] < state["target"] and not stop.is_set():
ep_num = state["completed"] + 1
log.info("episode %d/%d — booting VM", ep_num, state["target"])
try:
episode_id = _run_one_episode(
repo_root=repo_root,
data_root=data_root,
run_dir=run_dir,
interval_ms=args.interval_ms,
boot_timeout_s=args.boot_timeout,
)
except Exception:
log.exception("episode %d failed — retrying in %.0fs", ep_num, RETRY_DELAY_S)
time.sleep(RETRY_DELAY_S)
continue
state["completed"] += 1
state["last_episode_id"] = episode_id
state["last_completed_at"] = datetime.now(timezone.utc).isoformat()
_save_state(campaign_path, state)
log.info(
"episode %d/%d complete (id=%s)",
state["completed"], state["target"], episode_id,
)
if state["completed"] >= state["target"]:
state["done"] = True
state["finished_at"] = datetime.now(timezone.utc).isoformat()
_save_state(campaign_path, state)
done_marker.touch()
log.info(
"CAMPAIGN COMPLETE — %d episodes in %s",
state["completed"], data_root,
)
return 0
log.info(
"campaign paused at %d/%d — re-run to continue",
state["completed"], state["target"],
)
return 0
if __name__ == "__main__":
sys.exit(main())

243
tools/shipper.py Normal file
View file

@ -0,0 +1,243 @@
"""Shipper — watches data/episodes/ and ships completed episodes to the receiver.
An episode is "complete" when its directory contains a done.marker file.
The shipper scans on every wake-up, so a crash mid-tar or mid-PUT is harmless
the next pass picks up where it left off.
Ship flow per episode:
1. tar + zstd data/outbox/<id>.tar.zst.partial
2. atomic rename data/outbox/<id>.tar.zst
3. PUT to receiver with X-Content-SHA256 header
4. 200/201 mv episodes/<id> shipped/<id>/; rm outbox/<id>.tar.zst
5. 5xx/network exponential backoff (1s 300s cap), retry
Usage:
uv run python tools/shipper.py \\
--data-root data \\
--receiver-url https://collector.wg \\
--host-id lab-host-1 \\
[--ca-bundle /etc/cis490/certs/wg-ca.pem] \\
[--client-cert /etc/cis490/certs/lab-host-1.pem] \\
[--client-key /etc/cis490/certs/lab-host-1.key]
"""
from __future__ import annotations
import argparse
import hashlib
import logging
import os
import shutil
import signal
import ssl
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.request
from pathlib import Path
log = logging.getLogger("cis490.shipper")
SCAN_INTERVAL_S = 30.0
BACKOFF_INITIAL_S = 1.0
BACKOFF_CAP_S = 300.0
# ---------------------------------------------------------------------------
# tar + zstd
# ---------------------------------------------------------------------------
def _compress(episode_dir: Path, outbox_partial: Path) -> None:
tar = subprocess.Popen(
["tar", "-c", "-C", str(episode_dir.parent), episode_dir.name],
stdout=subprocess.PIPE,
)
with outbox_partial.open("wb") as out_f:
zst = subprocess.Popen(["zstd", "-q"], stdin=tar.stdout, stdout=out_f)
tar.stdout.close() # type: ignore[union-attr]
if tar.wait() != 0:
zst.kill()
raise RuntimeError("tar exited non-zero")
if zst.wait() != 0:
raise RuntimeError("zstd exited non-zero")
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
# ---------------------------------------------------------------------------
# HTTP PUT
# ---------------------------------------------------------------------------
def _build_ssl_ctx(ca_bundle: str | None, client_cert: str | None, client_key: str | None) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if ca_bundle:
ctx.load_verify_locations(ca_bundle)
else:
ctx.load_default_certs()
if client_cert and client_key:
ctx.load_cert_chain(client_cert, client_key)
return ctx
def _put_episode(
receiver_url: str,
host_id: str,
episode_id: str,
tarball: Path,
sha256: str,
ssl_ctx: ssl.SSLContext,
) -> int:
url = f"{receiver_url.rstrip('/')}/v1/episodes/{host_id}/{episode_id}.tar.zst"
size = tarball.stat().st_size
with tarball.open("rb") as body:
req = urllib.request.Request(url, data=body, method="PUT")
req.add_header("Content-Type", "application/zstd")
req.add_header("Content-Length", str(size))
req.add_header("X-Content-SHA256", sha256)
req.add_header("X-Schema-Version", "1")
req.add_header("X-Lab-Host", host_id)
req.add_header("X-Episode-Id", episode_id)
try:
with urllib.request.urlopen(req, context=ssl_ctx, timeout=120) as resp:
return resp.status
except urllib.error.HTTPError as exc:
return exc.code
# ---------------------------------------------------------------------------
# per-episode ship logic
# ---------------------------------------------------------------------------
def _ship_episode(
episode_dir: Path,
outbox_dir: Path,
shipped_dir: Path,
receiver_url: str,
host_id: str,
ssl_ctx: ssl.SSLContext,
) -> None:
episode_id = episode_dir.name
tarball = outbox_dir / f"{episode_id}.tar.zst"
partial = outbox_dir / f"{episode_id}.tar.zst.partial"
# Step 1-2: compress (idempotent — skip if tarball already exists in outbox)
if not tarball.exists():
log.info("compressing %s", episode_id)
try:
_compress(episode_dir, partial)
except Exception:
partial.unlink(missing_ok=True)
raise
os.replace(partial, tarball)
sha256 = _sha256_file(tarball)
# Step 3-4: PUT with backoff
backoff = BACKOFF_INITIAL_S
while True:
log.info("shipping %s (%.1f KiB)", episode_id, tarball.stat().st_size / 1024)
try:
status = _put_episode(receiver_url, host_id, episode_id, tarball, sha256, ssl_ctx)
except Exception as exc:
log.warning("network error shipping %s: %s — retry in %.0fs", episode_id, exc, backoff)
time.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_CAP_S)
continue
if status in (200, 201):
log.info("shipped %s (status=%d)", episode_id, status)
shutil.move(str(episode_dir), str(shipped_dir / episode_id))
tarball.unlink(missing_ok=True)
return
if status == 409:
log.error(
"conflict on %s — receiver has different bytes. Leaving in place for manual triage.",
episode_id,
)
return
log.warning("receiver returned %d for %s — retry in %.0fs", status, episode_id, backoff)
time.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_CAP_S)
# ---------------------------------------------------------------------------
# main scan loop
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(prog="shipper")
parser.add_argument("--data-root", default="data")
parser.add_argument("--receiver-url", required=True)
parser.add_argument("--host-id", required=True)
parser.add_argument("--ca-bundle", default=None)
parser.add_argument("--client-cert", default=None)
parser.add_argument("--client-key", default=None)
parser.add_argument(
"--scan-interval", type=float, default=SCAN_INTERVAL_S,
help="seconds between directory scans (default 30)",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
data_root = Path(args.data_root)
episodes_dir = data_root / "episodes"
outbox_dir = data_root / "outbox"
shipped_dir = data_root / "shipped"
for d in (episodes_dir, outbox_dir, shipped_dir):
d.mkdir(parents=True, exist_ok=True)
ssl_ctx = _build_ssl_ctx(args.ca_bundle, args.client_cert, args.client_key)
stop = threading.Event()
def _handle_signal(sig, _frame):
log.info("signal %d — stopping after current scan", sig)
stop.set()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
log.info("shipper started: watching %s, receiver=%s", episodes_dir, args.receiver_url)
while not stop.is_set():
for ep_dir in sorted(episodes_dir.iterdir()):
if stop.is_set():
break
if not ep_dir.is_dir():
continue
if not (ep_dir / "done.marker").exists():
continue
try:
_ship_episode(
episode_dir=ep_dir,
outbox_dir=outbox_dir,
shipped_dir=shipped_dir,
receiver_url=args.receiver_url,
host_id=args.host_id,
ssl_ctx=ssl_ctx,
)
except Exception:
log.exception("failed to ship %s — will retry next scan", ep_dir.name)
stop.wait(timeout=args.scan_interval)
log.info("shipper stopped")
return 0
if __name__ == "__main__":
sys.exit(main())