"""Shipper episode queue — scan, compress, ship, retire. State machine, mirroring docs/transport.md: data/episodes//done.marker | v tar+zstd → data/outbox/.tar.zst.partial | v rename → data/outbox/.tar.zst | v PUT to receiver | +-- 200/201 → mv data/episodes/ → data/shipped/ | rm data/outbox/.tar.zst | +-- 409 → leave files in place (the local + remote tarball | differ; manual triage) | +-- 5xx/net → leave outbox tarball; retry on next pass | +-- 4xx → log + skip (caller-side bug, doesn't self-heal) Idempotent on every pass. A crash mid-tar leaves only a ``.partial`` which the next pass overwrites. A crash mid-PUT leaves the tarball in ``outbox/`` and the next pass re-ships it; the receiver responds 200 on a matching sha256, 409 on a divergent one. """ from __future__ import annotations import json import logging import shutil import subprocess import tarfile import tempfile import time from dataclasses import dataclass from pathlib import Path from .config import ShipperConfig from .transport import ShipperTransport, ShipResult, hash_file log = logging.getLogger("cis490.shipper.queue") @dataclass(frozen=True) class PassResult: scanned: int shipped: int transient_failures: int conflicts: int fatal: int class ShipperQueue: def __init__(self, cfg: ShipperConfig, transport: ShipperTransport) -> None: self.cfg = cfg self.transport = transport cfg.episodes_dir.mkdir(parents=True, exist_ok=True) cfg.outbox_dir.mkdir(parents=True, exist_ok=True) cfg.shipped_dir.mkdir(parents=True, exist_ok=True) cfg.quarantine_dir.mkdir(parents=True, exist_ok=True) # Last wall-clock time we walked quarantine/ for cleanup. Set # to 0.0 so the first pass always sweeps (covers daemon # startup after a long downtime). self._last_quarantine_cleanup_at: float = 0.0 # ---- main entry point --------------------------------------------- def run_once(self) -> PassResult: """One scan pass. Returns counts for logging / tests.""" # Sweep stale outbox tarballs first. Normal lifecycle has the # tarball deleted alongside retire / quarantine, but operator # intervention (rm of an episode dir) or a crash between # rename(2) and the post-ship cleanup can leave one orphaned. # Bounded by the number of files in outbox/, so cheap to do # every pass. self._sweep_outbox() # Drop quarantine entries older than keep_days — gated on a # once-per-hour check so the 5-second scan tick doesn't # statx() the whole quarantine tree on every pass. self._maybe_cleanup_quarantine() ready = self._ready_episodes() scanned = len(ready) shipped = 0 transient = 0 conflicts = 0 fatal = 0 for ep_dir in ready: episode_id = ep_dir.name try: tarball, sha = self._tar_episode(ep_dir) except Exception: log.exception("tar failed for %s", episode_id) transient += 1 continue commit = self._read_episode_commit(ep_dir) res = self.transport.ship_tarball(episode_id, tarball, sha, commit=commit) # Receiver returns 412 with a remediation block when the # commit isn't in its allow-list — surface the body so the # lab-host operator (or its on-device AI) sees what to run. if res.status_code == 412 and res.body: log.error( "ship %s -> 412 commit-rejected. your_commit=%s " "head=%s. Remediation:\n%s", episode_id, (res.body or {}).get("your_commit"), (res.body or {}).get("head_commit"), (res.body or {}).get("remediation", ""), ) else: log.info( "ship %s -> %s (%d) %s", episode_id, res.status, res.status_code, res.error or "", ) if res.status in ("stored", "already-present"): self._retire(ep_dir, tarball) shipped += 1 elif res.status == "conflict": conflicts += 1 # Keep the tarball + episode dir in place. Operator must # decide whether to drop our copy or fix the remote one. elif res.status == "transient": transient += 1 else: # fatal # Move the episode out of the live queue so the next # scan doesn't re-tar/re-PUT the same dir forever. The # tarball gets deleted; meta.json + telemetry survive # in quarantine/ for operator triage. self._quarantine(ep_dir, tarball, res) fatal += 1 return PassResult( scanned=scanned, shipped=shipped, transient_failures=transient, conflicts=conflicts, fatal=fatal, ) def run_forever( self, *, stop_check=lambda: False, heartbeat=lambda: None, ) -> None: """Long-running scan loop. Args: stop_check: returns True when the daemon should exit (SIGTERM handler). Checked between passes and inside the inter-pass sleep so SIGTERM isn't blocked for up to scan_interval_s. heartbeat: invoked once per completed pass. Wired to sd_notify("WATCHDOG=1") in production so systemd can kill+restart the daemon if a pass hangs longer than the unit's WatchdogSec — catches silent stalls (deadlock, blocked I/O past timeout) that Restart=on-failure misses. """ while not stop_check(): try: self.run_once() except Exception: log.exception("scan pass crashed; sleeping anyway") try: heartbeat() except Exception: # A heartbeat failure mustn't take down the daemon — # if the watchdog wire is broken, we want at least the # ship loop to keep running. log.exception("heartbeat callback failed") # Coarse sleep: we don't need precise scheduling and we # don't want a tight loop on errors. t0 = time.monotonic() while time.monotonic() - t0 < self.cfg.scan_interval_s: if stop_check(): return time.sleep(0.5) # ---- internals ----------------------------------------------------- def _ready_episodes(self) -> list[Path]: out: list[Path] = [] if not self.cfg.episodes_dir.exists(): return out for ep in sorted(self.cfg.episodes_dir.iterdir()): if ep.is_dir() and (ep / "done.marker").exists(): out.append(ep) return out def _maybe_cleanup_quarantine(self) -> None: """Walk quarantine/ and remove episodes older than keep_days. Cheap on a daemon that's been running a while because the once-per-hour gate prevents the scan tick from statx()-ing the whole tree every 5s. On the first pass after startup, the gate's 0.0 sentinel means we always sweep — that catches a daemon that was offline through a backlog accumulation.""" keep_days = self.cfg.quarantine_keep_days if keep_days <= 0: return now = time.time() if (now - self._last_quarantine_cleanup_at < self.cfg.quarantine_cleanup_interval_s): return self._last_quarantine_cleanup_at = now cutoff = now - (keep_days * 86400) removed = 0 if not self.cfg.quarantine_dir.exists(): return for ep in self.cfg.quarantine_dir.iterdir(): if not ep.is_dir(): continue try: # Use mtime — quarantine dirs are written once # (the rename + the reason file), so mtime tracks # quarantine age, not the original episode age. if ep.stat().st_mtime < cutoff: shutil.rmtree(ep, ignore_errors=True) removed += 1 except OSError: log.exception("quarantine cleanup failed for %s", ep.name) if removed: log.info("quarantine cleanup: removed %d episode(s) older than %d days", removed, keep_days) def _sweep_outbox(self) -> None: """Delete tarballs in outbox/ that have no matching episode dir. The invariant the queue maintains: ``outbox/.tar.zst`` only exists while ``episodes//`` is also present. retire+quarantine both delete the tarball when they move the episode out, and tar overwrites any prior ``.partial`` on each pass. So a stray tarball means an external actor (or an OS crash) broke the invariant — clean it up rather than carrying dead bytes on disk forever.""" outbox = self.cfg.outbox_dir if not outbox.exists(): return for f in outbox.iterdir(): name = f.name if name.endswith(".tar.zst"): ep_id = name[: -len(".tar.zst")] elif name.endswith(".tar.zst.partial"): ep_id = name[: -len(".tar.zst.partial")] else: continue if not (self.cfg.episodes_dir / ep_id).exists(): try: f.unlink() log.info("swept orphan tarball %s", name) except OSError: log.exception("failed to sweep orphan tarball %s", name) def _read_episode_commit(self, ep_dir: Path) -> str | None: """Pull meta.json::code_version.commit so the shipper can send it as X-Cis490-Code-Commit. Returns None if the file is missing/malformed; the receiver will reject with a 400 in that case and the operator can see the lab host needs a re-install.""" meta_path = ep_dir / "meta.json" if not meta_path.exists(): return None try: meta = json.loads(meta_path.read_text()) except (json.JSONDecodeError, OSError): return None cv = meta.get("code_version") or {} commit = cv.get("commit") if isinstance(commit, str) and len(commit) == 40: return commit.lower() return None def _tar_episode(self, ep_dir: Path) -> tuple[Path, str]: """Tar+zstd the episode dir into outbox. Idempotent — overwrites any prior partial. Returns ``(tarball_path, sha256_hex)``.""" episode_id = ep_dir.name outbox = self.cfg.outbox_dir partial = outbox / f"{episode_id}.tar.zst.partial" final = outbox / f"{episode_id}.tar.zst" partial.unlink(missing_ok=True) # We use the system `zstd` for streaming compression: pipe a # tar stream into `zstd -T0 -19` to get a deterministic tarball # without buffering the whole tar in memory or pulling in the # python-zstandard dependency. Falls back to in-process `zstd` # via the python wheel if the binary isn't on PATH. if _which_zstd(): with partial.open("wb") as zout: proc = subprocess.Popen( ["zstd", "-q", "-T0", "-19", "--stdout"], stdin=subprocess.PIPE, stdout=zout, ) assert proc.stdin is not None with tarfile.open(fileobj=proc.stdin, mode="w|") as tf: tf.add(ep_dir, arcname=episode_id, recursive=True) proc.stdin.close() rc = proc.wait() if rc != 0: partial.unlink(missing_ok=True) raise RuntimeError(f"zstd exited {rc}") else: # Fallback: pipe through python's built-in zlib via gzip is # NOT compatible (we want zstd). Surface the missing binary # rather than silently producing a non-zstd tarball. partial.unlink(missing_ok=True) raise RuntimeError( "the `zstd` binary is required on the lab host. " "Install it via your package manager." ) sha = hash_file(partial) partial.replace(final) return final, sha def _retire(self, ep_dir: Path, tarball: Path) -> None: """Move episode dir → shipped/, drop the tarball.""" target = self.cfg.shipped_dir / ep_dir.name if target.exists(): # Belt-and-suspenders: re-shipping an already-retired # episode shouldn't happen (the dir was moved), but if it # does, prefer the existing copy and just clean up. shutil.rmtree(ep_dir, ignore_errors=True) else: ep_dir.replace(target) tarball.unlink(missing_ok=True) def _quarantine(self, ep_dir: Path, tarball: Path, res: ShipResult) -> None: """Move a permanently-rejected episode out of the live queue. Drops a small ``quarantine_reason.json`` next to the episode so an operator (or a future backfill tool) can see why without having to dig through journalctl. The tarball in outbox/ goes away — re-shipping won't help and it just burns disk.""" target = self.cfg.quarantine_dir / ep_dir.name if target.exists(): shutil.rmtree(ep_dir, ignore_errors=True) else: ep_dir.replace(target) reason = { "status_code": res.status_code, "error": res.error, "body": res.body, "quarantined_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } try: (target / "quarantine_reason.json").write_text(json.dumps(reason)) except OSError: log.exception("quarantine reason write failed for %s", ep_dir.name) tarball.unlink(missing_ok=True) def _which_zstd() -> bool: return shutil.which("zstd") is not None