#!/usr/bin/env python3 """One-shot drain for pre-stamp episodes stuck in a lab-host's queue. Scans /var/lib/cis490/data/episodes/ and moves any episode whose meta.json lacks a 40-char-hex code_version.commit (or has no meta.json at all) into data/quarantine//, dropping a quarantine_reason.json beside it. Why this exists: the receiver's commit-allow-list went live on 2026-05-01; everything generated by the lab host before that has no ``code_version`` field, so every PUT 400s. The shipper's normal fatal-quarantine path (queue.py::_quarantine) covers new episodes that get rejected from here on, but a host with a few thousand pre-stamp episodes already in episodes/ is going to spend hours just clearing those before any new (stamped) episode gets shipped. Run this once per lab host to drain that backlog instantly. Idempotent. Safe to run while cis490-shipper is active — episodes are moved with rename(2), so the shipper either sees the dir before or after the move, never partway. If a name collision in quarantine/ does happen (e.g. a previous run quarantined the same id), the existing quarantine entry wins and the live copy is removed. Usage: sudo -u cis490 /opt/cis490/.venv/bin/python \\ /opt/cis490/tools/quarantine_unstamped.py \\ --data-root /var/lib/cis490/data """ from __future__ import annotations import argparse import json import shutil import sys import time from dataclasses import dataclass from pathlib import Path HEX40 = set("0123456789abcdef") def _looks_stamped(meta_path: Path) -> bool: """True iff meta.json carries a plausible 40-char-hex commit.""" try: meta = json.loads(meta_path.read_text()) except (OSError, json.JSONDecodeError): return False cv = meta.get("code_version") or {} commit = cv.get("commit") if not isinstance(commit, str) or len(commit) != 40: return False return all(c in HEX40 for c in commit.lower()) @dataclass class Result: scanned: int quarantined: int skipped_no_marker: int kept_stamped: int errors: int def drain(data_root: Path, *, dry_run: bool = False) -> Result: episodes_dir = data_root / "episodes" quarantine_dir = data_root / "quarantine" quarantine_dir.mkdir(parents=True, exist_ok=True) res = Result(0, 0, 0, 0, 0) if not episodes_dir.exists(): return res for ep in sorted(episodes_dir.iterdir()): if not ep.is_dir(): continue res.scanned += 1 # Only touch episodes the orchestrator finished writing — an # in-progress dir without done.marker should be left alone so # the orchestrator can finish it normally. if not (ep / "done.marker").exists(): res.skipped_no_marker += 1 continue meta = ep / "meta.json" if _looks_stamped(meta): res.kept_stamped += 1 continue target = quarantine_dir / ep.name try: if dry_run: print(f"would-quarantine {ep.name}") else: if target.exists(): shutil.rmtree(ep, ignore_errors=True) else: ep.replace(target) reason = { "status_code": 400, "error": "pre-stamp episode (no code_version) drained by quarantine_unstamped.py", "body": None, "quarantined_at_wall": time.strftime( "%Y-%m-%dT%H:%M:%SZ", time.gmtime() ), } try: (target / "quarantine_reason.json").write_text( json.dumps(reason) ) except OSError: pass res.quarantined += 1 except OSError as e: print(f"error: failed to quarantine {ep.name}: {e}", file=sys.stderr) res.errors += 1 return res def main() -> int: p = argparse.ArgumentParser(description=__doc__) p.add_argument( "--data-root", default="/var/lib/cis490/data", type=Path, help="Lab-host data root (contains episodes/, outbox/, etc.).", ) p.add_argument( "--dry-run", action="store_true", help="Print what would be moved without moving anything.", ) args = p.parse_args() res = drain(args.data_root, dry_run=args.dry_run) print( f"scanned={res.scanned} quarantined={res.quarantined} " f"kept_stamped={res.kept_stamped} skipped_no_marker={res.skipped_no_marker} " f"errors={res.errors}" ) return 1 if res.errors else 0 if __name__ == "__main__": sys.exit(main())