CIS490/receiver/version_gate.py
max 5cebe7096a robustness: gate falls back to local git, queue sweeps stale tarballs
Two follow-ups from the post-cutover diagnosis:

1. version_gate: forgejo → local git fallback. If forgejo refresh
   returns empty AND a local repo path is configured, retry against
   `git log` from the local checkout. The receiver service runs on
   the same Pi as forgejo, so a simultaneous restart used to leave
   the gate's cache empty and reject every PUT with not-in-window.
   Auto-detects /opt/cis490/.git when the operator hasn't set
   local_repo_path explicitly — that path is always present on a
   production receiver and ProtectSystem=strict still allows reads.
   Logs `source=git-fallback` so this isn't silent.

2. shipper/queue: sweep orphaned outbox tarballs. The lifecycle
   invariant is `outbox/<id>.tar.zst exists ⇒ episodes/<id>/ exists`
   — broken historically by the now-fixed fatal-loop, by operator
   `rm` of an episode dir, or by an OS crash between rename(2) and
   the post-ship cleanup. Without sweeping, dead bytes pile up
   forever. New _sweep_outbox runs at the start of every scan,
   bounded by the file count in outbox/.

Tests cover: fallback fires when forgejo unreachable + repo_path set;
no fallback when repo_path None (opt-in); orphan tarball + partial
get swept on the next pass; live tarballs untouched.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 11:49:38 -05:00

188 lines
7.4 KiB
Python

"""Live commit allow-list for the receiver.
The receiver only stores episodes whose `meta.json::code_version.commit`
matches a commit in the canonical repository's recent history. Two
backends are supported:
forgejo: queries
GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>
on a Forgejo instance the maintainer pushes to. PRODUCTION
DEFAULT — Forgejo is the authoritative source of truth that
both lab hosts and the receiver pull from, so when the
maintainer pushes new code the new commit becomes acceptable
automatically.
git: runs `git log -n <window> --format=%H` against a local
checkout. Used by tests + dev-only setups where a Forgejo
instance isn't available.
Cache TTL: 5s by default — push a commit, wait 5s, the new hash is
in the allow-list. No service restart.
Episodes from older code (before a known bug fix) get rejected with
HTTP 412 + a remediation block telling the lab-host operator to pull
main and re-run the install. That keeps bad data out of the index.
"""
from __future__ import annotations
import json
import logging
import subprocess
import threading
import time
import urllib.parse
import urllib.request
from pathlib import Path
log = logging.getLogger("cis490.receiver.version_gate")
class VersionGate:
"""Maintains the set of acceptable commit hashes via either a
Forgejo HTTP API call or a local `git log`.
Args:
forgejo_url: e.g. "http://10.100.0.1:3000". Setting this enables
the Forgejo backend; ``repo_owner``/``repo_name``/``branch``
must also be set. ``auth_token`` is optional but recommended
(so a private Forgejo doesn't need to be world-readable).
repo_path: local checkout (fallback / test backend). Used iff
``forgejo_url`` is None.
window: how many recent commits count as valid.
cache_ttl_s: how long to trust the cache before refreshing.
"""
def __init__(
self,
repo_path: Path | None = None,
*,
window: int = 100,
cache_ttl_s: float = 5.0,
forgejo_url: str | None = None,
repo_owner: str | None = None,
repo_name: str | None = None,
branch: str = "main",
auth_token: str | None = None,
) -> None:
self.repo_path = Path(repo_path) if repo_path else None
self.window = int(window)
self.cache_ttl_s = float(cache_ttl_s)
self.forgejo_url = forgejo_url.rstrip("/") if forgejo_url else None
self.repo_owner = repo_owner
self.repo_name = repo_name
self.branch = branch
self.auth_token = auth_token
if not self.forgejo_url and not self.repo_path:
raise ValueError("VersionGate needs forgejo_url or repo_path")
self._lock = threading.Lock()
self._cached_hashes: frozenset[str] = frozenset()
self._cached_at: float = 0.0
self._head: str | None = None
# ---- backend dispatch -----------------------------------------------
def _refresh(self) -> None:
# Try forgejo first when configured. If it fails AND we also
# have a local repo path, fall back to git — this lets the
# receiver keep working through a forgejo blip (e.g. both
# services restarting on the same host) without flat-rejecting
# every PUT with not-in-window. The local repo on the
# receiver's own host is the closest thing to ground truth we
# have when the canonical source is unreachable.
source = "forgejo" if self.forgejo_url else "git"
if self.forgejo_url:
hashes, head = self._refresh_forgejo()
if not hashes and self.repo_path is not None:
log.info("forgejo refresh empty; falling back to local git "
"at %s", self.repo_path)
hashes, head = self._refresh_git()
if hashes:
source = "git-fallback"
else:
hashes, head = self._refresh_git()
if not hashes:
log.warning("version-gate refresh empty; keeping prior cache "
"of %d hashes", len(self._cached_hashes))
self._cached_at = time.monotonic()
return
with self._lock:
self._cached_hashes = frozenset(hashes)
self._cached_at = time.monotonic()
self._head = head
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
len(hashes), head[:12] if head else "?", source)
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""
url = (
f"{self.forgejo_url}/api/v1/repos/{self.repo_owner}/"
f"{self.repo_name}/commits"
f"?sha={urllib.parse.quote(self.branch)}&limit={self.window}"
)
req = urllib.request.Request(url)
if self.auth_token:
req.add_header("Authorization", f"token {self.auth_token}")
try:
with urllib.request.urlopen(req, timeout=3) as r:
rows = json.loads(r.read().decode("utf-8"))
except (urllib.request.HTTPError, urllib.request.URLError,
json.JSONDecodeError, OSError) as e:
log.warning("forgejo refresh failed (%s); keeping prior cache", e)
return set(), self._head
hashes: set[str] = set()
head: str | None = None
for i, row in enumerate(rows or []):
sha = row.get("sha")
if isinstance(sha, str) and len(sha) == 40:
sha = sha.lower()
hashes.add(sha)
if i == 0:
head = sha
return hashes, head
def _refresh_git(self) -> tuple[set[str], str | None]:
"""`git log -n <window> --format=%H` from `repo_path`."""
try:
out = subprocess.run(
["git", "-C", str(self.repo_path),
"log", f"-n{self.window}", "--format=%H"],
check=True, capture_output=True, text=True, timeout=3,
).stdout
except (subprocess.SubprocessError, FileNotFoundError, OSError) as e:
log.warning("git refresh failed (%s); keeping prior cache", e)
return set(), self._head
lines = [h.strip().lower() for h in out.splitlines() if h.strip()]
head = lines[0] if lines else None
return set(lines), head
def _maybe_refresh(self) -> None:
if (time.monotonic() - self._cached_at) > self.cache_ttl_s:
self._refresh()
def head(self) -> str | None:
"""Return the most recent valid commit (HEAD of the branch
the receiver is mirroring). Used by the 412 response so the
client knows what to pull to."""
self._maybe_refresh()
return self._head
def valid_count(self) -> int:
self._maybe_refresh()
return len(self._cached_hashes)
def check(self, commit: str | None) -> tuple[bool, str | None]:
"""Return (ok, reason). ``reason`` is None on success, a
short string identifying the failure mode otherwise."""
if not commit:
return False, "missing"
c = commit.strip().lower()
if len(c) != 40 or not all(ch in "0123456789abcdef" for ch in c):
return False, "bad-format"
self._maybe_refresh()
with self._lock:
allowed = self._cached_hashes
if c in allowed:
return True, None
return False, "not-in-window"