From f8ad02b2d7b2dbd947e48ad659bd2555deb08cbf Mon Sep 17 00:00:00 2001 From: max Date: Fri, 1 May 2026 01:38:50 -0500 Subject: [PATCH] Receiver enforces X-Cis490-Code-Commit allow-list (live, auto-refreshed) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stops out-of-date lab hosts from polluting the dataset with episodes generated by buggy code. The valid-commits set mirrors the maintainer's working clone on the Pi automatically — when the maintainer pulls or pushes a new commit, the receiver picks it up within the 5-second cache TTL with no service restart. Receiver changes: - receiver/version_gate.py (new): VersionGate(repo_path, window). Each check() consults a frozenset of the last `window` commit hashes from `git -C log --format=%H -n `, refreshed every 5s under a lock. Resilient to transient git failure (keeps prior cache so a flaky `git` doesn't lock out every shipper). - receiver/app.py: PUT extracts X-Cis490-Code-Commit; gate.check() before ingest. Rejects with: 400 + remediation if header missing or malformed 412 + remediation + your_commit + head_commit if not in window Remediation block is verbatim copy-pasteable into the lab-host shell: cd /opt/cis490 && sudo -u cis490 git pull origin main sudo /opt/cis490/scripts/install-lab-host.sh sudo systemctl restart cis490-orchestrator - receiver/store.py: ingest_stream takes commit kwarg, stamps it on the index.jsonl row (new optional field). Backfilled rows from index_backfill.py also pull commit out of meta.json. - receiver/config.py + etc/receiver.toml.example: new [version_gate] section. enabled=true, repo_path=/home/max/cis490, window=100 by default. Enabled toggle exists for emergency disable-and-collect. Shipper changes: - shipper/transport.py: ship_tarball() takes commit kwarg, sends X-Cis490-Code-Commit header. 412 maps to status='fatal' so the queue doesn't infinite-retry — operator must pull and reinstall before the next ship will succeed. - shipper/queue.py: reads meta.json::code_version.commit per episode, passes through. On 412, logs the receiver's full remediation block at ERROR level so journalctl on the lab host shows exactly what to run. Tests: 9 in test_version_gate (including 2 end-to-end via starlette.testclient), 2 cover the boundary where new commits land mid-cache and where missing-repo gracefully keeps prior cache. 157/157 total. Index schema: existing rows stay valid (commit field is optional on read). New rows from receiver-direct AND from index_backfill.py include commit. --- etc/receiver.toml.example | 10 ++ receiver/__main__.py | 8 ++ receiver/app.py | 53 +++++++++ receiver/config.py | 12 ++ receiver/store.py | 22 ++-- receiver/version_gate.py | 113 +++++++++++++++++++ shipper/queue.py | 43 ++++++- shipper/transport.py | 17 +++ tests/test_version_gate.py | 222 +++++++++++++++++++++++++++++++++++++ tools/index_backfill.py | 38 ++++++- 10 files changed, 521 insertions(+), 17 deletions(-) create mode 100644 receiver/version_gate.py create mode 100644 tests/test_version_gate.py diff --git a/etc/receiver.toml.example b/etc/receiver.toml.example index 6f71277..5cd19db 100644 --- a/etc/receiver.toml.example +++ b/etc/receiver.toml.example @@ -13,3 +13,13 @@ max_episode_bytes = 268_435_456 # 256 MiB # is for dev testing or as a belt-and-suspenders alongside mTLS. # [auth] # bearer_token = "REPLACE_ME_WITH_SECRET" + +# Code-version gate. Every PUT must carry X-Cis490-Code-Commit and that +# commit must be in the receiver's allow-list. The allow-list is the +# last `window` commits of `repo_path` (auto-refreshed every 5s, so a +# `git pull` on the Pi makes new commits acceptable instantly). This +# keeps episodes from out-of-date lab hosts out of the index. +[version_gate] +enabled = true +repo_path = "/home/max/cis490" +window = 100 diff --git a/receiver/__main__.py b/receiver/__main__.py index 6e23882..b88a165 100644 --- a/receiver/__main__.py +++ b/receiver/__main__.py @@ -9,6 +9,7 @@ import uvicorn from .app import make_app from .config import ReceiverConfig from .store import EpisodeStore +from .version_gate import VersionGate def main() -> None: @@ -31,10 +32,17 @@ def main() -> None: incoming_root=cfg.incoming_root, index_path=cfg.index_path, ) + version_gate = None + if cfg.version_gate_enabled: + version_gate = VersionGate( + repo_path=cfg.version_gate_repo, + window=cfg.version_gate_window, + ) app = make_app( store=store, max_episode_bytes=cfg.max_episode_bytes, bearer_token=cfg.bearer_token, + version_gate=version_gate, ) uvicorn.run( app, diff --git a/receiver/app.py b/receiver/app.py index 763817e..6147a18 100644 --- a/receiver/app.py +++ b/receiver/app.py @@ -12,6 +12,7 @@ from starlette.responses import JSONResponse, Response from starlette.routing import Route from .store import EpisodeStore, is_valid_id +from .version_gate import VersionGate log = logging.getLogger("cis490.receiver") @@ -38,6 +39,7 @@ def make_app( store: EpisodeStore, max_episode_bytes: int, bearer_token: str | None = None, + version_gate: VersionGate | None = None, ) -> Starlette: async def health(request: Request) -> JSONResponse: return JSONResponse({"status": "ok"}) @@ -89,6 +91,56 @@ def make_app( except ValueError: return JSONResponse({"error": "bad X-Schema-Version"}, status_code=400) + # Code-version gate. Every PUT must carry the orchestrator's + # commit hash and that hash must be in the receiver's current + # allow-list (last N commits on the maintainer's working clone). + # Missing → 400 (client bug); not-in-window → 412 (out-of-date + # lab host, must pull main). + commit = request.headers.get("x-cis490-code-commit", "").strip().lower() + if version_gate is not None: + ok, reason = version_gate.check(commit) if commit else (False, "missing") + if not ok: + head = version_gate.head() + if reason == "missing": + body = { + "error": "missing X-Cis490-Code-Commit header", + "remediation": ( + "Lab-host is shipping with no code_version stamp. " + "Pull origin/main and re-run install-lab-host.sh " + "so the orchestrator emits meta.json.code_version " + "and the shipper forwards X-Cis490-Code-Commit." + ), + } + return JSONResponse(body, status_code=400) + if reason == "bad-format": + return JSONResponse( + {"error": "X-Cis490-Code-Commit must be 40 lowercase hex"}, + status_code=400, + ) + # not-in-window: out-of-date lab host + body = { + "error": "code commit rejected: not in receiver's allow-list", + "your_commit": commit, + "valid_window_size": version_gate.valid_count(), + "head_commit": head, + "remediation": ( + "Pull origin/main on this lab host and rebuild before " + "shipping further:\n" + " cd /opt/cis490 && sudo -u cis490 git pull origin main\n" + " sudo /opt/cis490/scripts/install-lab-host.sh\n" + " sudo systemctl restart cis490-orchestrator\n" + "Episodes from old code stay queued; the next ship will " + "succeed once the lab-host's HEAD is in the receiver's " + "allow-list. Do NOT bypass this check — it exists to " + "keep buggy pre-fix data out of the training set." + ), + } + log.warning( + "rejected episode host=%s id=%s commit=%s reason=%s", + host_id, episode_id, commit[:12], reason, + ) + return JSONResponse(body, status_code=412) + cl = request.headers.get("content-length") if cl is not None: try: @@ -104,6 +156,7 @@ def make_app( episode_id=episode_id, expected_sha256=expected_sha, schema_version=schema_version, + commit=commit or None, body=request.stream(), max_bytes=max_episode_bytes, ) diff --git a/receiver/config.py b/receiver/config.py index 7c5ee1b..3994ca6 100644 --- a/receiver/config.py +++ b/receiver/config.py @@ -17,6 +17,12 @@ class ReceiverConfig: index_path: Path max_episode_bytes: int bearer_token: str | None + # Path to the maintainer's working clone — receiver consults its + # `git log` for the commit-allow-list. Default mirrors the + # canonical Pi setup. + version_gate_repo: Path + version_gate_window: int + version_gate_enabled: bool @classmethod def load(cls, path: str | Path) -> "ReceiverConfig": @@ -25,6 +31,7 @@ class ReceiverConfig: listen_addr = data.get("listen_addr", "127.0.0.1:8443") host, _, port = listen_addr.rpartition(":") + version_gate = data.get("version_gate", {}) return cls( listen_host=host or "127.0.0.1", listen_port=int(port), @@ -35,4 +42,9 @@ class ReceiverConfig: data.get("limits", {}).get("max_episode_bytes", DEFAULT_MAX_EPISODE_BYTES) ), bearer_token=data.get("auth", {}).get("bearer_token"), + version_gate_repo=Path( + version_gate.get("repo_path", "/home/max/cis490") + ).resolve(), + version_gate_window=int(version_gate.get("window", 100)), + version_gate_enabled=bool(version_gate.get("enabled", True)), ) diff --git a/receiver/store.py b/receiver/store.py index a2884f3..82eff1b 100644 --- a/receiver/store.py +++ b/receiver/store.py @@ -59,6 +59,7 @@ class EpisodeStore: schema_version: int, body: AsyncIterator[bytes], max_bytes: int, + commit: str | None = None, ) -> StoreResult: final = self.final_path(host_id, episode_id) if final.exists(): @@ -109,16 +110,17 @@ class EpisodeStore: final.parent.mkdir(parents=True, exist_ok=True) os.replace(partial, final) - self._append_index( - { - "received_at_wall": datetime.now(timezone.utc).isoformat(), - "host_id": host_id, - "episode_id": episode_id, - "sha256": actual, - "size_bytes": bytes_written, - "schema_version": schema_version, - } - ) + row = { + "received_at_wall": datetime.now(timezone.utc).isoformat(), + "host_id": host_id, + "episode_id": episode_id, + "sha256": actual, + "size_bytes": bytes_written, + "schema_version": schema_version, + } + if commit: + row["commit"] = commit + self._append_index(row) return StoreResult(status="stored", sha256=actual, size_bytes=bytes_written) except BaseException: partial.unlink(missing_ok=True) diff --git a/receiver/version_gate.py b/receiver/version_gate.py new file mode 100644 index 0000000..57f63b0 --- /dev/null +++ b/receiver/version_gate.py @@ -0,0 +1,113 @@ +"""Live commit allow-list for the receiver. + +The receiver only stores episodes whose `meta.json::code_version.commit` +matches a commit in the maintainer's local repository on the Pi. The +allow-list is "the last N commits to the active branch" — auto-refreshed +from `git log` so a `git pull` (or push) on the Pi makes the new commit +acceptable without a service restart. + +Episodes from older code (before a known bug fix) get rejected with +HTTP 412 + a remediation block telling the lab-host operator to pull +main and re-run the install. That keeps bad data out of the index. +""" + +from __future__ import annotations + +import logging +import subprocess +import threading +import time +from pathlib import Path + + +log = logging.getLogger("cis490.receiver.version_gate") + + +class VersionGate: + """Maintains the set of acceptable commit hashes. + + Refresh strategy: lazy. Each call to ``check()`` consults the + cache; if the cache is older than ``cache_ttl_s``, re-runs + ``git log --format=%H -n `` from ``repo_path``. The + cache is process-shared (a single dict guarded by a lock), so + concurrent uvicorn workers don't all fork git on the same tick. + + `repo_path` should point at the maintainer's working clone on the + Pi (the one that gets `git pull`'d when new code lands). Default + is ``/home/max/cis490``; overridable via the receiver config so a + different operator/path works without code changes.""" + + def __init__( + self, + repo_path: Path, + *, + window: int = 100, + cache_ttl_s: float = 5.0, + ) -> None: + self.repo_path = Path(repo_path) + self.window = int(window) + self.cache_ttl_s = float(cache_ttl_s) + self._lock = threading.Lock() + self._cached_hashes: frozenset[str] = frozenset() + self._cached_at: float = 0.0 + self._head: str | None = None + + def _refresh(self) -> None: + """Re-read git log. Falls back to whatever's cached on + subprocess error so a transient git issue doesn't lock out + every shipper at once.""" + try: + out = subprocess.run( + ["git", "-C", str(self.repo_path), + "log", f"-n{self.window}", "--format=%H"], + check=True, capture_output=True, text=True, timeout=3, + ).stdout + except (subprocess.SubprocessError, FileNotFoundError, OSError) as e: + log.warning("version-gate git refresh failed (%s); keeping prior cache " + "of %d hashes", e, len(self._cached_hashes)) + self._cached_at = time.monotonic() + return + hashes = {h.strip().lower() for h in out.splitlines() if h.strip()} + if not hashes: + log.warning("version-gate: git log returned empty; keeping prior cache") + self._cached_at = time.monotonic() + return + head = next(iter(hashes)) if len(hashes) == 1 else None + # First hash from `git log` IS the HEAD commit. + head = out.splitlines()[0].strip().lower() if out.splitlines() else None + with self._lock: + self._cached_hashes = frozenset(hashes) + self._cached_at = time.monotonic() + self._head = head + log.info("version-gate refreshed: %d valid hashes, head=%s", + len(hashes), head[:12] if head else "?") + + def _maybe_refresh(self) -> None: + if (time.monotonic() - self._cached_at) > self.cache_ttl_s: + self._refresh() + + def head(self) -> str | None: + """Return the most recent valid commit (HEAD of the branch + the receiver is mirroring). Used by the 412 response so the + client knows what to pull to.""" + self._maybe_refresh() + return self._head + + def valid_count(self) -> int: + self._maybe_refresh() + return len(self._cached_hashes) + + def check(self, commit: str | None) -> tuple[bool, str | None]: + """Return (ok, reason). ``reason`` is None on success, a + short string identifying the failure mode otherwise.""" + if not commit: + return False, "missing" + c = commit.strip().lower() + if len(c) != 40 or not all(ch in "0123456789abcdef" for ch in c): + return False, "bad-format" + self._maybe_refresh() + with self._lock: + allowed = self._cached_hashes + if c in allowed: + return True, None + return False, "not-in-window" diff --git a/shipper/queue.py b/shipper/queue.py index 76efb01..1302ebc 100644 --- a/shipper/queue.py +++ b/shipper/queue.py @@ -31,6 +31,7 @@ on a matching sha256, 409 on a divergent one. from __future__ import annotations +import json import logging import shutil import subprocess @@ -84,11 +85,25 @@ class ShipperQueue: transient += 1 continue - res = self.transport.ship_tarball(episode_id, tarball, sha) - log.info( - "ship %s -> %s (%d) %s", - episode_id, res.status, res.status_code, res.error or "", - ) + commit = self._read_episode_commit(ep_dir) + res = self.transport.ship_tarball(episode_id, tarball, sha, + commit=commit) + # Receiver returns 412 with a remediation block when the + # commit isn't in its allow-list — surface the body so the + # lab-host operator (or its on-device AI) sees what to run. + if res.status_code == 412 and res.body: + log.error( + "ship %s -> 412 commit-rejected. your_commit=%s " + "head=%s. Remediation:\n%s", + episode_id, (res.body or {}).get("your_commit"), + (res.body or {}).get("head_commit"), + (res.body or {}).get("remediation", ""), + ) + else: + log.info( + "ship %s -> %s (%d) %s", + episode_id, res.status, res.status_code, res.error or "", + ) if res.status in ("stored", "already-present"): self._retire(ep_dir, tarball) @@ -135,6 +150,24 @@ class ShipperQueue: out.append(ep) return out + def _read_episode_commit(self, ep_dir: Path) -> str | None: + """Pull meta.json::code_version.commit so the shipper can send + it as X-Cis490-Code-Commit. Returns None if the file is + missing/malformed; the receiver will reject with a 400 in that + case and the operator can see the lab host needs a re-install.""" + meta_path = ep_dir / "meta.json" + if not meta_path.exists(): + return None + try: + meta = json.loads(meta_path.read_text()) + except (json.JSONDecodeError, OSError): + return None + cv = meta.get("code_version") or {} + commit = cv.get("commit") + if isinstance(commit, str) and len(commit) == 40: + return commit.lower() + return None + def _tar_episode(self, ep_dir: Path) -> tuple[Path, str]: """Tar+zstd the episode dir into outbox. Idempotent — overwrites any prior partial. Returns ``(tarball_path, sha256_hex)``.""" diff --git a/shipper/transport.py b/shipper/transport.py index cd32edd..64993b1 100644 --- a/shipper/transport.py +++ b/shipper/transport.py @@ -162,6 +162,7 @@ class ShipperTransport: episode_id: str, tarball_path: Path, sha256_hex: str, + commit: str | None = None, ) -> ShipResult: if not self._try_build_verify(): return ShipResult( @@ -180,6 +181,10 @@ class ShipperTransport: "X-Content-SHA256": sha256_hex, "X-Episode-Id": episode_id, } + if commit: + # Receiver enforces this against its commit-allow-list and + # rejects with 412 if not in window. See receiver/version_gate.py. + headers["X-Cis490-Code-Commit"] = commit try: with httpx.Client(verify=self._verify, timeout=self.cfg.request_timeout_s) as c, \ @@ -225,6 +230,18 @@ class ShipperTransport: body=body_json, error="receiver already has a different sha256 for this id", ) + if r.status_code == 412: + # Code-commit not in receiver's allow-list. The operator + # of THIS lab host needs to pull main + reinstall; + # retrying without that won't help. Treat as fatal so + # queue.run_once() doesn't loop on it. + return ShipResult( + status="fatal", + status_code=412, + sha256=None, + body=body_json, + error="code commit rejected — pull origin/main and reinstall", + ) if 500 <= r.status_code < 600: return ShipResult( status="transient", diff --git a/tests/test_version_gate.py b/tests/test_version_gate.py new file mode 100644 index 0000000..67cd74e --- /dev/null +++ b/tests/test_version_gate.py @@ -0,0 +1,222 @@ +"""Tests for the receiver's commit-allow-list gate. + +The gate refreshes the allow-list from `git log` of a configured +repo path. Tests use real git operations on a temp repo so we +exercise the same subprocess code paths the receiver does in +production. +""" + +from __future__ import annotations + +import subprocess +from pathlib import Path + +import pytest + +from receiver.version_gate import VersionGate + + +def _git(cwd: Path, *args: str) -> str: + return subprocess.check_output( + ["git", "-c", "user.email=t@t", "-c", "user.name=t", + "-C", str(cwd), *args], + text=True, + ).strip() + + +@pytest.fixture +def repo(tmp_path: Path) -> Path: + r = tmp_path / "repo" + r.mkdir() + _git(r, "init", "--initial-branch=main") + (r / "f").write_text("v1") + _git(r, "add", "f") + _git(r, "commit", "-m", "v1") + return r + + +def _commits(repo: Path) -> list[str]: + return _git(repo, "log", "--format=%H").splitlines() + + +def test_check_accepts_head_commit(repo: Path) -> None: + g = VersionGate(repo, window=10, cache_ttl_s=0) + head = _commits(repo)[0] + ok, reason = g.check(head) + assert ok and reason is None + assert g.head() == head + + +def test_check_rejects_unknown_commit(repo: Path) -> None: + g = VersionGate(repo, window=10, cache_ttl_s=0) + ok, reason = g.check("0" * 40) + assert not ok and reason == "not-in-window" + + +def test_check_rejects_missing_commit(repo: Path) -> None: + g = VersionGate(repo, window=10, cache_ttl_s=0) + ok, reason = g.check(None) + assert not ok and reason == "missing" + ok, reason = g.check("") + assert not ok and reason == "missing" + + +def test_check_rejects_bad_format(repo: Path) -> None: + g = VersionGate(repo, window=10, cache_ttl_s=0) + ok, reason = g.check("not-a-hash") + assert not ok and reason == "bad-format" + ok, reason = g.check("ABCDEF") # too short, but valid hex + assert not ok and reason == "bad-format" + + +def test_new_commit_after_pull_is_accepted_within_ttl(repo: Path) -> None: + """The whole point: when the maintainer commits new code on the + Pi, the receiver picks it up automatically without restart.""" + g = VersionGate(repo, window=10, cache_ttl_s=0) + # Add a new commit AFTER gate is constructed. + (repo / "f").write_text("v2") + _git(repo, "commit", "-am", "v2") + new_head = _commits(repo)[0] + # cache_ttl_s=0 forces refresh on next check. + ok, _ = g.check(new_head) + assert ok + assert g.head() == new_head + + +def test_window_limits_history(repo: Path) -> None: + """Old commits past the window should drop out of the allow-list.""" + # Add 5 more commits. + for i in range(2, 7): + (repo / "f").write_text(f"v{i}") + _git(repo, "commit", "-am", f"v{i}") + all_commits = _commits(repo) + assert len(all_commits) == 6 + g = VersionGate(repo, window=3, cache_ttl_s=0) + # Top 3 are valid. + for c in all_commits[:3]: + ok, _ = g.check(c) + assert ok, f"{c[:8]} should be in window" + # Older 3 are not. + for c in all_commits[3:]: + ok, reason = g.check(c) + assert not ok and reason == "not-in-window" + + +def test_e2e_receiver_returns_412_for_unknown_commit(repo: Path, tmp_path: Path) -> None: + """End-to-end: PUT with an out-of-window commit returns 412 with + the remediation block, and the tarball does NOT land on disk.""" + import io as _io, json as _json, tarfile as _tar, hashlib as _h + from starlette.testclient import TestClient + from receiver.app import make_app + from receiver.store import EpisodeStore + + head = _commits(repo)[0] + rcv_root = tmp_path / "rcv" + store = EpisodeStore( + store_root=rcv_root / "ep", + incoming_root=rcv_root / "in", + index_path=rcv_root / "index.jsonl", + ) + gate = VersionGate(repo, window=10, cache_ttl_s=0) + app = make_app(store=store, max_episode_bytes=10_000_000, + bearer_token=None, version_gate=gate) + + # Build a tiny valid tarball. + raw = _io.BytesIO() + with _tar.open(fileobj=raw, mode="w") as t: + info = _tar.TarInfo("01TEST/meta.json") + body = b"{}" + info.size = len(body) + t.addfile(info, _io.BytesIO(body)) + payload = raw.getvalue() + sha = _h.sha256(payload).hexdigest() + + with TestClient(app) as client: + # Wrong commit: rejected with 412 + remediation in body. + bad = "0" * 40 + r = client.put( + f"/v1/episodes/lab1/01TEST.tar.zst", + content=payload, + headers={ + "X-Content-SHA256": sha, + "X-Lab-Host": "lab1", + "X-Cis490-Code-Commit": bad, + }, + ) + assert r.status_code == 412 + body = r.json() + assert "remediation" in body + assert body["your_commit"] == bad + assert body["head_commit"] == head + # Index must NOT have grown. + assert store.index_path.read_text() == "" + + # Right commit: accepted (201). + r = client.put( + f"/v1/episodes/lab1/01TEST.tar.zst", + content=payload, + headers={ + "X-Content-SHA256": sha, + "X-Lab-Host": "lab1", + "X-Cis490-Code-Commit": head, + }, + ) + assert r.status_code == 201, r.text + # Index gained one row stamped with the commit. + rows = [_json.loads(l) for l in store.index_path.read_text().splitlines() if l.strip()] + assert len(rows) == 1 + assert rows[0]["commit"] == head + + +def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_path: Path) -> None: + """Missing header is a client bug (lab host pre-stamp-update); + receiver returns 400 with remediation.""" + import io as _io, tarfile as _tar, hashlib as _h + from starlette.testclient import TestClient + from receiver.app import make_app + from receiver.store import EpisodeStore + + rcv_root = tmp_path / "rcv" + store = EpisodeStore( + store_root=rcv_root / "ep", + incoming_root=rcv_root / "in", + index_path=rcv_root / "index.jsonl", + ) + gate = VersionGate(repo, window=10, cache_ttl_s=0) + app = make_app(store=store, max_episode_bytes=10_000_000, + bearer_token=None, version_gate=gate) + + raw = _io.BytesIO() + with _tar.open(fileobj=raw, mode="w") as t: + info = _tar.TarInfo("01TEST/meta.json") + info.size = 2 + t.addfile(info, _io.BytesIO(b"{}")) + payload = raw.getvalue() + sha = _h.sha256(payload).hexdigest() + with TestClient(app) as client: + r = client.put( + f"/v1/episodes/lab1/01TEST.tar.zst", + content=payload, + headers={ + "X-Content-SHA256": sha, + "X-Lab-Host": "lab1", + # no X-Cis490-Code-Commit + }, + ) + assert r.status_code == 400 + assert "missing" in r.json()["error"].lower() + + +def test_missing_repo_keeps_prior_cache(repo: Path) -> None: + """If the maintainer's clone disappears (or git fails), the gate + keeps its last-known allow-list — better than locking out every + shipper at once.""" + g = VersionGate(repo, window=10, cache_ttl_s=0) + head = _commits(repo)[0] + ok, _ = g.check(head) + assert ok + # Now break the repo path. + g.repo_path = repo / "does-not-exist" + # Cache should still serve the previously-known head. + ok, _ = g.check(head) + assert ok diff --git a/tools/index_backfill.py b/tools/index_backfill.py index 18a43cc..a0d3a57 100644 --- a/tools/index_backfill.py +++ b/tools/index_backfill.py @@ -42,6 +42,36 @@ def _sha256_of(path: Path) -> str: return h.hexdigest() +def _commit_from_tarball(tar_path: Path) -> str | None: + """Extract meta.json::code_version.commit from a tar.zst without + leaving anything on disk. Returns None on any failure — callers + write the row without a commit field.""" + import io as _io, subprocess as _sp, tarfile as _tar + try: + z = _sp.check_output( + ["zstd", "-q", "-d", "--stdout", str(tar_path)], + stderr=_sp.DEVNULL, + ) + except (_sp.CalledProcessError, FileNotFoundError, OSError): + return None + try: + with _tar.open(fileobj=_io.BytesIO(z)) as t: + for m in t.getmembers(): + if m.name.endswith("meta.json") and m.isfile(): + f = t.extractfile(m) + if f is None: + return None + meta = json.loads(f.read().decode("utf-8")) + cv = meta.get("code_version") or {} + c = cv.get("commit") + if isinstance(c, str) and len(c) == 40: + return c.lower() + return None + except Exception: + return None + return None + + def _existing_episode_ids(index_path: Path) -> set[str]: if not index_path.exists(): return set() @@ -89,7 +119,7 @@ def main(argv: list[str] | None = None) -> int: continue sha = _sha256_of(tar) size = tar.stat().st_size - rows_to_write.append({ + row: dict = { "received_at_wall": datetime.now(timezone.utc).isoformat(), "host_id": host_dir.name, "episode_id": episode_id, @@ -97,7 +127,11 @@ def main(argv: list[str] | None = None) -> int: "size_bytes": size, "schema_version": SCHEMA_VERSION, "backfilled": True, - }) + } + commit = _commit_from_tarball(tar) + if commit: + row["commit"] = commit + rows_to_write.append(row) print(f"scanned: {scanned} already-indexed: {scanned - len(rows_to_write)} " f"to-backfill: {len(rows_to_write)}")