"""Shipper — watches data/episodes/ and ships completed episodes to the receiver. An episode is "complete" when its directory contains a done.marker file. The shipper scans on every wake-up, so a crash mid-tar or mid-PUT is harmless — the next pass picks up where it left off. Ship flow per episode: 1. tar + zstd → data/outbox/.tar.zst.partial 2. atomic rename → data/outbox/.tar.zst 3. PUT to receiver with X-Content-SHA256 header 4. 200/201 → mv episodes/ shipped//; rm outbox/.tar.zst 5. 5xx/network → exponential backoff (1s → 300s cap), retry Usage: uv run python tools/shipper.py \\ --data-root data \\ --receiver-url https://collector.wg \\ --host-id lab-host-1 \\ [--ca-bundle /etc/cis490/certs/wg-ca.pem] \\ [--client-cert /etc/cis490/certs/lab-host-1.pem] \\ [--client-key /etc/cis490/certs/lab-host-1.key] """ from __future__ import annotations import argparse import hashlib import logging import os import shutil import signal import ssl import subprocess import sys import threading import time import urllib.error import urllib.request from pathlib import Path log = logging.getLogger("cis490.shipper") SCAN_INTERVAL_S = 30.0 BACKOFF_INITIAL_S = 1.0 BACKOFF_CAP_S = 300.0 # --------------------------------------------------------------------------- # tar + zstd # --------------------------------------------------------------------------- def _compress(episode_dir: Path, outbox_partial: Path) -> None: tar = subprocess.Popen( ["tar", "-c", "-C", str(episode_dir.parent), episode_dir.name], stdout=subprocess.PIPE, ) with outbox_partial.open("wb") as out_f: zst = subprocess.Popen(["zstd", "-q"], stdin=tar.stdout, stdout=out_f) tar.stdout.close() # type: ignore[union-attr] if tar.wait() != 0: zst.kill() raise RuntimeError("tar exited non-zero") if zst.wait() != 0: raise RuntimeError("zstd exited non-zero") def _sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() # --------------------------------------------------------------------------- # HTTP PUT # --------------------------------------------------------------------------- def _build_ssl_ctx(ca_bundle: str | None, client_cert: str | None, client_key: str | None) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) if ca_bundle: ctx.load_verify_locations(ca_bundle) else: ctx.load_default_certs() if client_cert and client_key: ctx.load_cert_chain(client_cert, client_key) return ctx def _put_episode( receiver_url: str, host_id: str, episode_id: str, tarball: Path, sha256: str, ssl_ctx: ssl.SSLContext, ) -> int: url = f"{receiver_url.rstrip('/')}/v1/episodes/{host_id}/{episode_id}.tar.zst" size = tarball.stat().st_size with tarball.open("rb") as body: req = urllib.request.Request(url, data=body, method="PUT") req.add_header("Content-Type", "application/zstd") req.add_header("Content-Length", str(size)) req.add_header("X-Content-SHA256", sha256) req.add_header("X-Schema-Version", "1") req.add_header("X-Lab-Host", host_id) req.add_header("X-Episode-Id", episode_id) try: with urllib.request.urlopen(req, context=ssl_ctx, timeout=120) as resp: return resp.status except urllib.error.HTTPError as exc: return exc.code # --------------------------------------------------------------------------- # per-episode ship logic # --------------------------------------------------------------------------- def _ship_episode( episode_dir: Path, outbox_dir: Path, shipped_dir: Path, receiver_url: str, host_id: str, ssl_ctx: ssl.SSLContext, ) -> None: episode_id = episode_dir.name tarball = outbox_dir / f"{episode_id}.tar.zst" partial = outbox_dir / f"{episode_id}.tar.zst.partial" # Step 1-2: compress (idempotent — skip if tarball already exists in outbox) if not tarball.exists(): log.info("compressing %s", episode_id) try: _compress(episode_dir, partial) except Exception: partial.unlink(missing_ok=True) raise os.replace(partial, tarball) sha256 = _sha256_file(tarball) # Step 3-4: PUT with backoff backoff = BACKOFF_INITIAL_S while True: log.info("shipping %s (%.1f KiB)", episode_id, tarball.stat().st_size / 1024) try: status = _put_episode(receiver_url, host_id, episode_id, tarball, sha256, ssl_ctx) except Exception as exc: log.warning("network error shipping %s: %s — retry in %.0fs", episode_id, exc, backoff) time.sleep(backoff) backoff = min(backoff * 2, BACKOFF_CAP_S) continue if status in (200, 201): log.info("shipped %s (status=%d)", episode_id, status) shutil.move(str(episode_dir), str(shipped_dir / episode_id)) tarball.unlink(missing_ok=True) return if status == 409: log.error( "conflict on %s — receiver has different bytes. Leaving in place for manual triage.", episode_id, ) return log.warning("receiver returned %d for %s — retry in %.0fs", status, episode_id, backoff) time.sleep(backoff) backoff = min(backoff * 2, BACKOFF_CAP_S) # --------------------------------------------------------------------------- # main scan loop # --------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(prog="shipper") parser.add_argument("--data-root", default="data") parser.add_argument("--receiver-url", required=True) parser.add_argument("--host-id", required=True) parser.add_argument("--ca-bundle", default=None) parser.add_argument("--client-cert", default=None) parser.add_argument("--client-key", default=None) parser.add_argument( "--scan-interval", type=float, default=SCAN_INTERVAL_S, help="seconds between directory scans (default 30)", ) args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) data_root = Path(args.data_root) episodes_dir = data_root / "episodes" outbox_dir = data_root / "outbox" shipped_dir = data_root / "shipped" for d in (episodes_dir, outbox_dir, shipped_dir): d.mkdir(parents=True, exist_ok=True) ssl_ctx = _build_ssl_ctx(args.ca_bundle, args.client_cert, args.client_key) stop = threading.Event() def _handle_signal(sig, _frame): log.info("signal %d — stopping after current scan", sig) stop.set() signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) log.info("shipper started: watching %s, receiver=%s", episodes_dir, args.receiver_url) while not stop.is_set(): for ep_dir in sorted(episodes_dir.iterdir()): if stop.is_set(): break if not ep_dir.is_dir(): continue if not (ep_dir / "done.marker").exists(): continue try: _ship_episode( episode_dir=ep_dir, outbox_dir=outbox_dir, shipped_dir=shipped_dir, receiver_url=args.receiver_url, host_id=args.host_id, ssl_ctx=ssl_ctx, ) except Exception: log.exception("failed to ship %s — will retry next scan", ep_dir.name) stop.wait(timeout=args.scan_interval) log.info("shipper stopped") return 0 if __name__ == "__main__": sys.exit(main())