CIS490/shipper/queue.py
max f9b2e5c4e6 shipper: systemd watchdog, quarantine cleanup; doctor surfaces ship errors
Three robustness items off the future-work list:

1. Shipper sd_notify watchdog. Type=notify + WatchdogSec=180. The
   daemon sends READY=1 after queue construction and WATCHDOG=1 once
   per scan pass via a heartbeat callback wired into run_forever.
   Restart=on-failure only catches process death — silent stalls
   (deadlock, hung tar subprocess, blocked I/O past timeout) used to
   leave a zombie running with the data backlog growing. Now systemd
   kills + restarts the daemon if no WATCHDOG=1 arrives within 180s.

   Verified end-to-end against systemd via `systemd-run --transient
   --property=Type=notify --property=WatchdogSec=10`: unit transitions
   to active on READY=1; SIGSTOP'ing the process triggers
   `Watchdog timeout (limit 10s)! Killing process N with SIGABRT` at
   exactly t+10s, then unit goes failed → restart cycle.

2. Quarantine cleanup. Without an upper bound, data/quarantine/ grew
   forever as fatal episodes piled up. New ShipperConfig fields:
     quarantine_keep_days = 30           # opt-out: 0 disables
     quarantine_cleanup_interval_s = 3600 # gate so 5s tick doesn't
                                          # statx() the whole tree
   Cleanup runs at the start of run_once() but is gated to once per
   hour. Removed entries logged.

3. Doctor surfaces shipping errors. Tails 10 minutes of cis490-shipper
   journal and surfaces 412/400/transient patterns as red/yellow rows
   with the canonical fix command. An on-device agent running
   cis490_doctor.py now sees one line ("12 ship(s) rejected as
   out-of-window") instead of needing to grep the journal.

Tests: 200/200 (was 188). New coverage: heartbeat callback fires +
survives exceptions; quarantine cleanup respects keep_days, gate, and
opt-out; doctor parser correctly classifies 412/400/transient/clean/
empty/journalctl-denied; both error classes prioritise 412 (more
actionable) when present together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 12:02:59 -05:00

361 lines
14 KiB
Python

"""Shipper episode queue — scan, compress, ship, retire.
State machine, mirroring docs/transport.md:
data/episodes/<id>/done.marker
|
v
tar+zstd → data/outbox/<id>.tar.zst.partial
|
v
rename → data/outbox/<id>.tar.zst
|
v
PUT to receiver
|
+-- 200/201 → mv data/episodes/<id> → data/shipped/<id>
| rm data/outbox/<id>.tar.zst
|
+-- 409 → leave files in place (the local + remote tarball
| differ; manual triage)
|
+-- 5xx/net → leave outbox tarball; retry on next pass
|
+-- 4xx → log + skip (caller-side bug, doesn't self-heal)
Idempotent on every pass. A crash mid-tar leaves only a ``.partial``
which the next pass overwrites. A crash mid-PUT leaves the tarball in
``outbox/`` and the next pass re-ships it; the receiver responds 200
on a matching sha256, 409 on a divergent one.
"""
from __future__ import annotations
import json
import logging
import shutil
import subprocess
import tarfile
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from .config import ShipperConfig
from .transport import ShipperTransport, ShipResult, hash_file
log = logging.getLogger("cis490.shipper.queue")
@dataclass(frozen=True)
class PassResult:
scanned: int
shipped: int
transient_failures: int
conflicts: int
fatal: int
class ShipperQueue:
def __init__(self, cfg: ShipperConfig, transport: ShipperTransport) -> None:
self.cfg = cfg
self.transport = transport
cfg.episodes_dir.mkdir(parents=True, exist_ok=True)
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
cfg.shipped_dir.mkdir(parents=True, exist_ok=True)
cfg.quarantine_dir.mkdir(parents=True, exist_ok=True)
# Last wall-clock time we walked quarantine/ for cleanup. Set
# to 0.0 so the first pass always sweeps (covers daemon
# startup after a long downtime).
self._last_quarantine_cleanup_at: float = 0.0
# ---- main entry point ---------------------------------------------
def run_once(self) -> PassResult:
"""One scan pass. Returns counts for logging / tests."""
# Sweep stale outbox tarballs first. Normal lifecycle has the
# tarball deleted alongside retire / quarantine, but operator
# intervention (rm of an episode dir) or a crash between
# rename(2) and the post-ship cleanup can leave one orphaned.
# Bounded by the number of files in outbox/, so cheap to do
# every pass.
self._sweep_outbox()
# Drop quarantine entries older than keep_days — gated on a
# once-per-hour check so the 5-second scan tick doesn't
# statx() the whole quarantine tree on every pass.
self._maybe_cleanup_quarantine()
ready = self._ready_episodes()
scanned = len(ready)
shipped = 0
transient = 0
conflicts = 0
fatal = 0
for ep_dir in ready:
episode_id = ep_dir.name
try:
tarball, sha = self._tar_episode(ep_dir)
except Exception:
log.exception("tar failed for %s", episode_id)
transient += 1
continue
commit = self._read_episode_commit(ep_dir)
res = self.transport.ship_tarball(episode_id, tarball, sha,
commit=commit)
# Receiver returns 412 with a remediation block when the
# commit isn't in its allow-list — surface the body so the
# lab-host operator (or its on-device AI) sees what to run.
if res.status_code == 412 and res.body:
log.error(
"ship %s -> 412 commit-rejected. your_commit=%s "
"head=%s. Remediation:\n%s",
episode_id, (res.body or {}).get("your_commit"),
(res.body or {}).get("head_commit"),
(res.body or {}).get("remediation", "<none>"),
)
else:
log.info(
"ship %s -> %s (%d) %s",
episode_id, res.status, res.status_code, res.error or "",
)
if res.status in ("stored", "already-present"):
self._retire(ep_dir, tarball)
shipped += 1
elif res.status == "conflict":
conflicts += 1
# Keep the tarball + episode dir in place. Operator must
# decide whether to drop our copy or fix the remote one.
elif res.status == "transient":
transient += 1
else: # fatal
# Move the episode out of the live queue so the next
# scan doesn't re-tar/re-PUT the same dir forever. The
# tarball gets deleted; meta.json + telemetry survive
# in quarantine/ for operator triage.
self._quarantine(ep_dir, tarball, res)
fatal += 1
return PassResult(
scanned=scanned,
shipped=shipped,
transient_failures=transient,
conflicts=conflicts,
fatal=fatal,
)
def run_forever(
self,
*,
stop_check=lambda: False,
heartbeat=lambda: None,
) -> None:
"""Long-running scan loop.
Args:
stop_check: returns True when the daemon should exit (SIGTERM
handler). Checked between passes and inside the inter-pass
sleep so SIGTERM isn't blocked for up to scan_interval_s.
heartbeat: invoked once per completed pass. Wired to
sd_notify("WATCHDOG=1") in production so systemd can
kill+restart the daemon if a pass hangs longer than the
unit's WatchdogSec — catches silent stalls (deadlock,
blocked I/O past timeout) that Restart=on-failure misses.
"""
while not stop_check():
try:
self.run_once()
except Exception:
log.exception("scan pass crashed; sleeping anyway")
try:
heartbeat()
except Exception:
# A heartbeat failure mustn't take down the daemon —
# if the watchdog wire is broken, we want at least the
# ship loop to keep running.
log.exception("heartbeat callback failed")
# Coarse sleep: we don't need precise scheduling and we
# don't want a tight loop on errors.
t0 = time.monotonic()
while time.monotonic() - t0 < self.cfg.scan_interval_s:
if stop_check():
return
time.sleep(0.5)
# ---- internals -----------------------------------------------------
def _ready_episodes(self) -> list[Path]:
out: list[Path] = []
if not self.cfg.episodes_dir.exists():
return out
for ep in sorted(self.cfg.episodes_dir.iterdir()):
if ep.is_dir() and (ep / "done.marker").exists():
out.append(ep)
return out
def _maybe_cleanup_quarantine(self) -> None:
"""Walk quarantine/ and remove episodes older than keep_days.
Cheap on a daemon that's been running a while because the
once-per-hour gate prevents the scan tick from statx()-ing
the whole tree every 5s. On the first pass after startup, the
gate's 0.0 sentinel means we always sweep — that catches a
daemon that was offline through a backlog accumulation."""
keep_days = self.cfg.quarantine_keep_days
if keep_days <= 0:
return
now = time.time()
if (now - self._last_quarantine_cleanup_at
< self.cfg.quarantine_cleanup_interval_s):
return
self._last_quarantine_cleanup_at = now
cutoff = now - (keep_days * 86400)
removed = 0
if not self.cfg.quarantine_dir.exists():
return
for ep in self.cfg.quarantine_dir.iterdir():
if not ep.is_dir():
continue
try:
# Use mtime — quarantine dirs are written once
# (the rename + the reason file), so mtime tracks
# quarantine age, not the original episode age.
if ep.stat().st_mtime < cutoff:
shutil.rmtree(ep, ignore_errors=True)
removed += 1
except OSError:
log.exception("quarantine cleanup failed for %s", ep.name)
if removed:
log.info("quarantine cleanup: removed %d episode(s) older than %d days",
removed, keep_days)
def _sweep_outbox(self) -> None:
"""Delete tarballs in outbox/ that have no matching episode dir.
The invariant the queue maintains: ``outbox/<id>.tar.zst``
only exists while ``episodes/<id>/`` is also present.
retire+quarantine both delete the tarball when they move the
episode out, and tar overwrites any prior ``.partial`` on
each pass. So a stray tarball means an external actor (or an
OS crash) broke the invariant — clean it up rather than
carrying dead bytes on disk forever."""
outbox = self.cfg.outbox_dir
if not outbox.exists():
return
for f in outbox.iterdir():
name = f.name
if name.endswith(".tar.zst"):
ep_id = name[: -len(".tar.zst")]
elif name.endswith(".tar.zst.partial"):
ep_id = name[: -len(".tar.zst.partial")]
else:
continue
if not (self.cfg.episodes_dir / ep_id).exists():
try:
f.unlink()
log.info("swept orphan tarball %s", name)
except OSError:
log.exception("failed to sweep orphan tarball %s", name)
def _read_episode_commit(self, ep_dir: Path) -> str | None:
"""Pull meta.json::code_version.commit so the shipper can send
it as X-Cis490-Code-Commit. Returns None if the file is
missing/malformed; the receiver will reject with a 400 in that
case and the operator can see the lab host needs a re-install."""
meta_path = ep_dir / "meta.json"
if not meta_path.exists():
return None
try:
meta = json.loads(meta_path.read_text())
except (json.JSONDecodeError, OSError):
return None
cv = meta.get("code_version") or {}
commit = cv.get("commit")
if isinstance(commit, str) and len(commit) == 40:
return commit.lower()
return None
def _tar_episode(self, ep_dir: Path) -> tuple[Path, str]:
"""Tar+zstd the episode dir into outbox. Idempotent — overwrites
any prior partial. Returns ``(tarball_path, sha256_hex)``."""
episode_id = ep_dir.name
outbox = self.cfg.outbox_dir
partial = outbox / f"{episode_id}.tar.zst.partial"
final = outbox / f"{episode_id}.tar.zst"
partial.unlink(missing_ok=True)
# We use the system `zstd` for streaming compression: pipe a
# tar stream into `zstd -T0 -19` to get a deterministic tarball
# without buffering the whole tar in memory or pulling in the
# python-zstandard dependency. Falls back to in-process `zstd`
# via the python wheel if the binary isn't on PATH.
if _which_zstd():
with partial.open("wb") as zout:
proc = subprocess.Popen(
["zstd", "-q", "-T0", "-19", "--stdout"],
stdin=subprocess.PIPE, stdout=zout,
)
assert proc.stdin is not None
with tarfile.open(fileobj=proc.stdin, mode="w|") as tf:
tf.add(ep_dir, arcname=episode_id, recursive=True)
proc.stdin.close()
rc = proc.wait()
if rc != 0:
partial.unlink(missing_ok=True)
raise RuntimeError(f"zstd exited {rc}")
else:
# Fallback: pipe through python's built-in zlib via gzip is
# NOT compatible (we want zstd). Surface the missing binary
# rather than silently producing a non-zstd tarball.
partial.unlink(missing_ok=True)
raise RuntimeError(
"the `zstd` binary is required on the lab host. "
"Install it via your package manager."
)
sha = hash_file(partial)
partial.replace(final)
return final, sha
def _retire(self, ep_dir: Path, tarball: Path) -> None:
"""Move episode dir → shipped/, drop the tarball."""
target = self.cfg.shipped_dir / ep_dir.name
if target.exists():
# Belt-and-suspenders: re-shipping an already-retired
# episode shouldn't happen (the dir was moved), but if it
# does, prefer the existing copy and just clean up.
shutil.rmtree(ep_dir, ignore_errors=True)
else:
ep_dir.replace(target)
tarball.unlink(missing_ok=True)
def _quarantine(self, ep_dir: Path, tarball: Path, res: ShipResult) -> None:
"""Move a permanently-rejected episode out of the live queue.
Drops a small ``quarantine_reason.json`` next to the episode so
an operator (or a future backfill tool) can see why without
having to dig through journalctl. The tarball in outbox/ goes
away — re-shipping won't help and it just burns disk."""
target = self.cfg.quarantine_dir / ep_dir.name
if target.exists():
shutil.rmtree(ep_dir, ignore_errors=True)
else:
ep_dir.replace(target)
reason = {
"status_code": res.status_code,
"error": res.error,
"body": res.body,
"quarantined_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
try:
(target / "quarantine_reason.json").write_text(json.dumps(reason))
except OSError:
log.exception("quarantine reason write failed for %s", ep_dir.name)
tarball.unlink(missing_ok=True)
def _which_zstd() -> bool:
return shutil.which("zstd") is not None