"""``cis490-index-backfill`` — rebuild missing index.jsonl rows from tarballs already on disk. Use case: a stretch where the receiver wrote tarballs to ``episodes//`` but failed to append the matching index row (permissions, disk full, crash mid-write). The shipper retries see ``already-present`` and never re-PUT, so the gap is permanent until something on the receiver-side fills it. This tool walks ``episodes//.tar.zst`` and, for any episode not already in the index, computes sha256 + size and appends a row matching the receiver's schema. Existing rows are left alone. Run on the receiver host as the same user the receiver runs under (``cis490``) so the appended rows match in ownership and the live receiver can keep writing afterward: sudo -u cis490 /opt/cis490/.venv/bin/python \\ /opt/cis490/tools/index_backfill.py \\ --episodes-root /var/lib/cis490/episodes \\ --index /var/lib/cis490/index.jsonl """ from __future__ import annotations import argparse import hashlib import json import sys from datetime import datetime, timezone from pathlib import Path SCHEMA_VERSION = 1 def _sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def _commit_from_tarball(tar_path: Path) -> str | None: """Extract meta.json::code_version.commit from a tar.zst without leaving anything on disk. Returns None on any failure — callers write the row without a commit field.""" import io as _io, subprocess as _sp, tarfile as _tar try: z = _sp.check_output( ["zstd", "-q", "-d", "--stdout", str(tar_path)], stderr=_sp.DEVNULL, ) except (_sp.CalledProcessError, FileNotFoundError, OSError): return None try: with _tar.open(fileobj=_io.BytesIO(z)) as t: for m in t.getmembers(): if m.name.endswith("meta.json") and m.isfile(): f = t.extractfile(m) if f is None: return None meta = json.loads(f.read().decode("utf-8")) cv = meta.get("code_version") or {} c = cv.get("commit") if isinstance(c, str) and len(c) == 40: return c.lower() return None except Exception: return None return None def _existing_episode_ids(index_path: Path) -> set[str]: if not index_path.exists(): return set() seen: set[str] = set() for line in index_path.read_text().splitlines(): if not line.strip(): continue try: row = json.loads(line) except json.JSONDecodeError: continue ep = row.get("episode_id") if isinstance(ep, str): seen.add(ep) return seen def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-index-backfill") p.add_argument("--episodes-root", type=Path, default=Path("/var/lib/cis490/episodes")) p.add_argument("--index", type=Path, default=Path("/var/lib/cis490/index.jsonl")) p.add_argument("--host", help="Only backfill this host_id") p.add_argument("--dry-run", action="store_true", help="Print what would be appended; don't write") args = p.parse_args(argv) if not args.episodes_root.exists(): print(f"no episodes dir at {args.episodes_root}", file=sys.stderr) return 2 seen = _existing_episode_ids(args.index) rows_to_write: list[dict] = [] scanned = 0 for host_dir in sorted(args.episodes_root.iterdir()): if not host_dir.is_dir(): continue if args.host and host_dir.name != args.host: continue for tar in sorted(host_dir.glob("*.tar.zst")): scanned += 1 episode_id = tar.stem.removesuffix(".tar") if episode_id in seen: continue sha = _sha256_of(tar) size = tar.stat().st_size row: dict = { "received_at_wall": datetime.now(timezone.utc).isoformat(), "host_id": host_dir.name, "episode_id": episode_id, "sha256": sha, "size_bytes": size, "schema_version": SCHEMA_VERSION, "backfilled": True, } commit = _commit_from_tarball(tar) if commit: row["commit"] = commit rows_to_write.append(row) print(f"scanned: {scanned} already-indexed: {scanned - len(rows_to_write)} " f"to-backfill: {len(rows_to_write)}") if not rows_to_write: return 0 if args.dry_run: for r in rows_to_write[:5]: print(json.dumps(r, sort_keys=True)) if len(rows_to_write) > 5: print(f"... ({len(rows_to_write) - 5} more)") return 0 # Append each row exactly the way the receiver does (single-line # write, append-mode open). This keeps writes atomic on POSIX for # rows < PIPE_BUF and never replaces the file, so ownership is # preserved automatically. with args.index.open("a") as f: for row in rows_to_write: f.write(json.dumps(row, sort_keys=True) + "\n") print(f"backfilled {len(rows_to_write)} rows into {args.index}") return 0 if __name__ == "__main__": sys.exit(main())