CIS490/tools/shipper.py
Elliott Kolden 842918556b Add automated campaign runner, shipper, and systemd units
Implements the unattended episode loop described in docs/deploy.md but not
yet built. run_campaign.py boots a fresh VM per episode, drives the full
phase schedule via the existing EpisodeRunner/VMLoadController stack, writes
campaign.json atomically after each episode, and signals completion with
campaign_done.marker. shipper.py watches data/episodes/ for done.marker
files, tar+zstd-compresses each, and PUTs them to the receiver with
exponential backoff on failure. Both support SIGTERM gracefully, finishing
the current episode/scan before exiting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 14:53:40 -06:00

243 lines
7.9 KiB
Python

"""Shipper — watches data/episodes/ and ships completed episodes to the receiver.
An episode is "complete" when its directory contains a done.marker file.
The shipper scans on every wake-up, so a crash mid-tar or mid-PUT is harmless
— the next pass picks up where it left off.
Ship flow per episode:
1. tar + zstd → data/outbox/<id>.tar.zst.partial
2. atomic rename → data/outbox/<id>.tar.zst
3. PUT to receiver with X-Content-SHA256 header
4. 200/201 → mv episodes/<id> shipped/<id>/; rm outbox/<id>.tar.zst
5. 5xx/network → exponential backoff (1s → 300s cap), retry
Usage:
uv run python tools/shipper.py \\
--data-root data \\
--receiver-url https://collector.wg \\
--host-id lab-host-1 \\
[--ca-bundle /etc/cis490/certs/wg-ca.pem] \\
[--client-cert /etc/cis490/certs/lab-host-1.pem] \\
[--client-key /etc/cis490/certs/lab-host-1.key]
"""
from __future__ import annotations
import argparse
import hashlib
import logging
import os
import shutil
import signal
import ssl
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.request
from pathlib import Path
log = logging.getLogger("cis490.shipper")
SCAN_INTERVAL_S = 30.0
BACKOFF_INITIAL_S = 1.0
BACKOFF_CAP_S = 300.0
# ---------------------------------------------------------------------------
# tar + zstd
# ---------------------------------------------------------------------------
def _compress(episode_dir: Path, outbox_partial: Path) -> None:
tar = subprocess.Popen(
["tar", "-c", "-C", str(episode_dir.parent), episode_dir.name],
stdout=subprocess.PIPE,
)
with outbox_partial.open("wb") as out_f:
zst = subprocess.Popen(["zstd", "-q"], stdin=tar.stdout, stdout=out_f)
tar.stdout.close() # type: ignore[union-attr]
if tar.wait() != 0:
zst.kill()
raise RuntimeError("tar exited non-zero")
if zst.wait() != 0:
raise RuntimeError("zstd exited non-zero")
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
# ---------------------------------------------------------------------------
# HTTP PUT
# ---------------------------------------------------------------------------
def _build_ssl_ctx(ca_bundle: str | None, client_cert: str | None, client_key: str | None) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if ca_bundle:
ctx.load_verify_locations(ca_bundle)
else:
ctx.load_default_certs()
if client_cert and client_key:
ctx.load_cert_chain(client_cert, client_key)
return ctx
def _put_episode(
receiver_url: str,
host_id: str,
episode_id: str,
tarball: Path,
sha256: str,
ssl_ctx: ssl.SSLContext,
) -> int:
url = f"{receiver_url.rstrip('/')}/v1/episodes/{host_id}/{episode_id}.tar.zst"
size = tarball.stat().st_size
with tarball.open("rb") as body:
req = urllib.request.Request(url, data=body, method="PUT")
req.add_header("Content-Type", "application/zstd")
req.add_header("Content-Length", str(size))
req.add_header("X-Content-SHA256", sha256)
req.add_header("X-Schema-Version", "1")
req.add_header("X-Lab-Host", host_id)
req.add_header("X-Episode-Id", episode_id)
try:
with urllib.request.urlopen(req, context=ssl_ctx, timeout=120) as resp:
return resp.status
except urllib.error.HTTPError as exc:
return exc.code
# ---------------------------------------------------------------------------
# per-episode ship logic
# ---------------------------------------------------------------------------
def _ship_episode(
episode_dir: Path,
outbox_dir: Path,
shipped_dir: Path,
receiver_url: str,
host_id: str,
ssl_ctx: ssl.SSLContext,
) -> None:
episode_id = episode_dir.name
tarball = outbox_dir / f"{episode_id}.tar.zst"
partial = outbox_dir / f"{episode_id}.tar.zst.partial"
# Step 1-2: compress (idempotent — skip if tarball already exists in outbox)
if not tarball.exists():
log.info("compressing %s", episode_id)
try:
_compress(episode_dir, partial)
except Exception:
partial.unlink(missing_ok=True)
raise
os.replace(partial, tarball)
sha256 = _sha256_file(tarball)
# Step 3-4: PUT with backoff
backoff = BACKOFF_INITIAL_S
while True:
log.info("shipping %s (%.1f KiB)", episode_id, tarball.stat().st_size / 1024)
try:
status = _put_episode(receiver_url, host_id, episode_id, tarball, sha256, ssl_ctx)
except Exception as exc:
log.warning("network error shipping %s: %s — retry in %.0fs", episode_id, exc, backoff)
time.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_CAP_S)
continue
if status in (200, 201):
log.info("shipped %s (status=%d)", episode_id, status)
shutil.move(str(episode_dir), str(shipped_dir / episode_id))
tarball.unlink(missing_ok=True)
return
if status == 409:
log.error(
"conflict on %s — receiver has different bytes. Leaving in place for manual triage.",
episode_id,
)
return
log.warning("receiver returned %d for %s — retry in %.0fs", status, episode_id, backoff)
time.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_CAP_S)
# ---------------------------------------------------------------------------
# main scan loop
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(prog="shipper")
parser.add_argument("--data-root", default="data")
parser.add_argument("--receiver-url", required=True)
parser.add_argument("--host-id", required=True)
parser.add_argument("--ca-bundle", default=None)
parser.add_argument("--client-cert", default=None)
parser.add_argument("--client-key", default=None)
parser.add_argument(
"--scan-interval", type=float, default=SCAN_INTERVAL_S,
help="seconds between directory scans (default 30)",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
data_root = Path(args.data_root)
episodes_dir = data_root / "episodes"
outbox_dir = data_root / "outbox"
shipped_dir = data_root / "shipped"
for d in (episodes_dir, outbox_dir, shipped_dir):
d.mkdir(parents=True, exist_ok=True)
ssl_ctx = _build_ssl_ctx(args.ca_bundle, args.client_cert, args.client_key)
stop = threading.Event()
def _handle_signal(sig, _frame):
log.info("signal %d — stopping after current scan", sig)
stop.set()
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
log.info("shipper started: watching %s, receiver=%s", episodes_dir, args.receiver_url)
while not stop.is_set():
for ep_dir in sorted(episodes_dir.iterdir()):
if stop.is_set():
break
if not ep_dir.is_dir():
continue
if not (ep_dir / "done.marker").exists():
continue
try:
_ship_episode(
episode_dir=ep_dir,
outbox_dir=outbox_dir,
shipped_dir=shipped_dir,
receiver_url=args.receiver_url,
host_id=args.host_id,
ssl_ctx=ssl_ctx,
)
except Exception:
log.exception("failed to ship %s — will retry next scan", ep_dir.name)
stop.wait(timeout=args.scan_interval)
log.info("shipper stopped")
return 0
if __name__ == "__main__":
sys.exit(main())