Implements the unattended episode loop described in docs/deploy.md but not yet built. run_campaign.py boots a fresh VM per episode, drives the full phase schedule via the existing EpisodeRunner/VMLoadController stack, writes campaign.json atomically after each episode, and signals completion with campaign_done.marker. shipper.py watches data/episodes/ for done.marker files, tar+zstd-compresses each, and PUTs them to the receiver with exponential backoff on failure. Both support SIGTERM gracefully, finishing the current episode/scan before exiting. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
243 lines
7.9 KiB
Python
243 lines
7.9 KiB
Python
"""Shipper — watches data/episodes/ and ships completed episodes to the receiver.
|
|
|
|
An episode is "complete" when its directory contains a done.marker file.
|
|
The shipper scans on every wake-up, so a crash mid-tar or mid-PUT is harmless
|
|
— the next pass picks up where it left off.
|
|
|
|
Ship flow per episode:
|
|
1. tar + zstd → data/outbox/<id>.tar.zst.partial
|
|
2. atomic rename → data/outbox/<id>.tar.zst
|
|
3. PUT to receiver with X-Content-SHA256 header
|
|
4. 200/201 → mv episodes/<id> shipped/<id>/; rm outbox/<id>.tar.zst
|
|
5. 5xx/network → exponential backoff (1s → 300s cap), retry
|
|
|
|
Usage:
|
|
uv run python tools/shipper.py \\
|
|
--data-root data \\
|
|
--receiver-url https://collector.wg \\
|
|
--host-id lab-host-1 \\
|
|
[--ca-bundle /etc/cis490/certs/wg-ca.pem] \\
|
|
[--client-cert /etc/cis490/certs/lab-host-1.pem] \\
|
|
[--client-key /etc/cis490/certs/lab-host-1.key]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import ssl
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger("cis490.shipper")
|
|
|
|
SCAN_INTERVAL_S = 30.0
|
|
BACKOFF_INITIAL_S = 1.0
|
|
BACKOFF_CAP_S = 300.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# tar + zstd
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _compress(episode_dir: Path, outbox_partial: Path) -> None:
|
|
tar = subprocess.Popen(
|
|
["tar", "-c", "-C", str(episode_dir.parent), episode_dir.name],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
with outbox_partial.open("wb") as out_f:
|
|
zst = subprocess.Popen(["zstd", "-q"], stdin=tar.stdout, stdout=out_f)
|
|
tar.stdout.close() # type: ignore[union-attr]
|
|
if tar.wait() != 0:
|
|
zst.kill()
|
|
raise RuntimeError("tar exited non-zero")
|
|
if zst.wait() != 0:
|
|
raise RuntimeError("zstd exited non-zero")
|
|
|
|
|
|
def _sha256_file(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP PUT
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_ssl_ctx(ca_bundle: str | None, client_cert: str | None, client_key: str | None) -> ssl.SSLContext:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
if ca_bundle:
|
|
ctx.load_verify_locations(ca_bundle)
|
|
else:
|
|
ctx.load_default_certs()
|
|
if client_cert and client_key:
|
|
ctx.load_cert_chain(client_cert, client_key)
|
|
return ctx
|
|
|
|
|
|
def _put_episode(
|
|
receiver_url: str,
|
|
host_id: str,
|
|
episode_id: str,
|
|
tarball: Path,
|
|
sha256: str,
|
|
ssl_ctx: ssl.SSLContext,
|
|
) -> int:
|
|
url = f"{receiver_url.rstrip('/')}/v1/episodes/{host_id}/{episode_id}.tar.zst"
|
|
size = tarball.stat().st_size
|
|
with tarball.open("rb") as body:
|
|
req = urllib.request.Request(url, data=body, method="PUT")
|
|
req.add_header("Content-Type", "application/zstd")
|
|
req.add_header("Content-Length", str(size))
|
|
req.add_header("X-Content-SHA256", sha256)
|
|
req.add_header("X-Schema-Version", "1")
|
|
req.add_header("X-Lab-Host", host_id)
|
|
req.add_header("X-Episode-Id", episode_id)
|
|
try:
|
|
with urllib.request.urlopen(req, context=ssl_ctx, timeout=120) as resp:
|
|
return resp.status
|
|
except urllib.error.HTTPError as exc:
|
|
return exc.code
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# per-episode ship logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ship_episode(
|
|
episode_dir: Path,
|
|
outbox_dir: Path,
|
|
shipped_dir: Path,
|
|
receiver_url: str,
|
|
host_id: str,
|
|
ssl_ctx: ssl.SSLContext,
|
|
) -> None:
|
|
episode_id = episode_dir.name
|
|
tarball = outbox_dir / f"{episode_id}.tar.zst"
|
|
partial = outbox_dir / f"{episode_id}.tar.zst.partial"
|
|
|
|
# Step 1-2: compress (idempotent — skip if tarball already exists in outbox)
|
|
if not tarball.exists():
|
|
log.info("compressing %s", episode_id)
|
|
try:
|
|
_compress(episode_dir, partial)
|
|
except Exception:
|
|
partial.unlink(missing_ok=True)
|
|
raise
|
|
os.replace(partial, tarball)
|
|
|
|
sha256 = _sha256_file(tarball)
|
|
|
|
# Step 3-4: PUT with backoff
|
|
backoff = BACKOFF_INITIAL_S
|
|
while True:
|
|
log.info("shipping %s (%.1f KiB)", episode_id, tarball.stat().st_size / 1024)
|
|
try:
|
|
status = _put_episode(receiver_url, host_id, episode_id, tarball, sha256, ssl_ctx)
|
|
except Exception as exc:
|
|
log.warning("network error shipping %s: %s — retry in %.0fs", episode_id, exc, backoff)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, BACKOFF_CAP_S)
|
|
continue
|
|
|
|
if status in (200, 201):
|
|
log.info("shipped %s (status=%d)", episode_id, status)
|
|
shutil.move(str(episode_dir), str(shipped_dir / episode_id))
|
|
tarball.unlink(missing_ok=True)
|
|
return
|
|
|
|
if status == 409:
|
|
log.error(
|
|
"conflict on %s — receiver has different bytes. Leaving in place for manual triage.",
|
|
episode_id,
|
|
)
|
|
return
|
|
|
|
log.warning("receiver returned %d for %s — retry in %.0fs", status, episode_id, backoff)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, BACKOFF_CAP_S)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# main scan loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(prog="shipper")
|
|
parser.add_argument("--data-root", default="data")
|
|
parser.add_argument("--receiver-url", required=True)
|
|
parser.add_argument("--host-id", required=True)
|
|
parser.add_argument("--ca-bundle", default=None)
|
|
parser.add_argument("--client-cert", default=None)
|
|
parser.add_argument("--client-key", default=None)
|
|
parser.add_argument(
|
|
"--scan-interval", type=float, default=SCAN_INTERVAL_S,
|
|
help="seconds between directory scans (default 30)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
|
|
data_root = Path(args.data_root)
|
|
episodes_dir = data_root / "episodes"
|
|
outbox_dir = data_root / "outbox"
|
|
shipped_dir = data_root / "shipped"
|
|
for d in (episodes_dir, outbox_dir, shipped_dir):
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
ssl_ctx = _build_ssl_ctx(args.ca_bundle, args.client_cert, args.client_key)
|
|
|
|
stop = threading.Event()
|
|
|
|
def _handle_signal(sig, _frame):
|
|
log.info("signal %d — stopping after current scan", sig)
|
|
stop.set()
|
|
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
|
|
log.info("shipper started: watching %s, receiver=%s", episodes_dir, args.receiver_url)
|
|
|
|
while not stop.is_set():
|
|
for ep_dir in sorted(episodes_dir.iterdir()):
|
|
if stop.is_set():
|
|
break
|
|
if not ep_dir.is_dir():
|
|
continue
|
|
if not (ep_dir / "done.marker").exists():
|
|
continue
|
|
try:
|
|
_ship_episode(
|
|
episode_dir=ep_dir,
|
|
outbox_dir=outbox_dir,
|
|
shipped_dir=shipped_dir,
|
|
receiver_url=args.receiver_url,
|
|
host_id=args.host_id,
|
|
ssl_ctx=ssl_ctx,
|
|
)
|
|
except Exception:
|
|
log.exception("failed to ship %s — will retry next scan", ep_dir.name)
|
|
|
|
stop.wait(timeout=args.scan_interval)
|
|
|
|
log.info("shipper stopped")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|