diff --git a/etc/receiver.toml.example b/etc/receiver.toml.example index 5cd19db..e5c0d7f 100644 --- a/etc/receiver.toml.example +++ b/etc/receiver.toml.example @@ -20,6 +20,18 @@ max_episode_bytes = 268_435_456 # 256 MiB # `git pull` on the Pi makes new commits acceptable instantly). This # keeps episodes from out-of-date lab hosts out of the index. [version_gate] -enabled = true -repo_path = "/home/max/cis490" -window = 100 +enabled = true +window = 100 +# Production: hit the local Forgejo for the canonical commit list. The +# maintainer pushes to this repo; lab hosts pull from it. When the +# receiver checks each PUT it sees the same commits the lab hosts +# would see if they pulled at the same instant. +forgejo_url = "http://10.100.0.1:3000" +repo_owner = "spectral" +repo_name = "CIS490" +branch = "main" +# Optional Forgejo token for private repos; remove for public. +# auth_token = "..." +# +# Dev-only fallback (used iff forgejo_url is unset): +# local_repo_path = "/home/max/cis490" diff --git a/receiver/__main__.py b/receiver/__main__.py index b88a165..09a7d3d 100644 --- a/receiver/__main__.py +++ b/receiver/__main__.py @@ -35,8 +35,13 @@ def main() -> None: version_gate = None if cfg.version_gate_enabled: version_gate = VersionGate( - repo_path=cfg.version_gate_repo, + repo_path=cfg.version_gate_local_repo, window=cfg.version_gate_window, + forgejo_url=cfg.version_gate_forgejo_url, + repo_owner=cfg.version_gate_repo_owner, + repo_name=cfg.version_gate_repo_name, + branch=cfg.version_gate_branch, + auth_token=cfg.version_gate_auth_token, ) app = make_app( store=store, diff --git a/receiver/config.py b/receiver/config.py index 3994ca6..064997d 100644 --- a/receiver/config.py +++ b/receiver/config.py @@ -17,12 +17,17 @@ class ReceiverConfig: index_path: Path max_episode_bytes: int bearer_token: str | None - # Path to the maintainer's working clone — receiver consults its - # `git log` for the commit-allow-list. Default mirrors the - # canonical Pi setup. - version_gate_repo: Path - version_gate_window: int + # Code-version gate. Production source is the local Forgejo + # (canonical repo both lab hosts and the receiver pull from); + # local-git path is a dev-only fallback. version_gate_enabled: bool + version_gate_window: int + version_gate_forgejo_url: str | None + version_gate_repo_owner: str | None + version_gate_repo_name: str | None + version_gate_branch: str + version_gate_auth_token: str | None + version_gate_local_repo: Path | None @classmethod def load(cls, path: str | Path) -> "ReceiverConfig": @@ -32,6 +37,7 @@ class ReceiverConfig: listen_addr = data.get("listen_addr", "127.0.0.1:8443") host, _, port = listen_addr.rpartition(":") version_gate = data.get("version_gate", {}) + local_repo = version_gate.get("local_repo_path") return cls( listen_host=host or "127.0.0.1", listen_port=int(port), @@ -42,9 +48,12 @@ class ReceiverConfig: data.get("limits", {}).get("max_episode_bytes", DEFAULT_MAX_EPISODE_BYTES) ), bearer_token=data.get("auth", {}).get("bearer_token"), - version_gate_repo=Path( - version_gate.get("repo_path", "/home/max/cis490") - ).resolve(), - version_gate_window=int(version_gate.get("window", 100)), version_gate_enabled=bool(version_gate.get("enabled", True)), + version_gate_window=int(version_gate.get("window", 100)), + version_gate_forgejo_url=version_gate.get("forgejo_url"), + version_gate_repo_owner=version_gate.get("repo_owner"), + version_gate_repo_name=version_gate.get("repo_name"), + version_gate_branch=version_gate.get("branch", "main"), + version_gate_auth_token=version_gate.get("auth_token"), + version_gate_local_repo=Path(local_repo).resolve() if local_repo else None, ) diff --git a/receiver/version_gate.py b/receiver/version_gate.py index 57f63b0..5ce8142 100644 --- a/receiver/version_gate.py +++ b/receiver/version_gate.py @@ -1,10 +1,23 @@ """Live commit allow-list for the receiver. The receiver only stores episodes whose `meta.json::code_version.commit` -matches a commit in the maintainer's local repository on the Pi. The -allow-list is "the last N commits to the active branch" — auto-refreshed -from `git log` so a `git pull` (or push) on the Pi makes the new commit -acceptable without a service restart. +matches a commit in the canonical repository's recent history. Two +backends are supported: + + forgejo: queries + GET /api/v1/repos///commits?sha=&limit= + on a Forgejo instance the maintainer pushes to. PRODUCTION + DEFAULT — Forgejo is the authoritative source of truth that + both lab hosts and the receiver pull from, so when the + maintainer pushes new code the new commit becomes acceptable + automatically. + + git: runs `git log -n --format=%H` against a local + checkout. Used by tests + dev-only setups where a Forgejo + instance isn't available. + +Cache TTL: 5s by default — push a commit, wait 5s, the new hash is +in the allow-list. No service restart. Episodes from older code (before a known bug fix) get rejected with HTTP 412 + a remediation block telling the lab-host operator to pull @@ -13,10 +26,13 @@ main and re-run the install. That keeps bad data out of the index. from __future__ import annotations +import json import logging import subprocess import threading import time +import urllib.parse +import urllib.request from pathlib import Path @@ -24,38 +40,97 @@ log = logging.getLogger("cis490.receiver.version_gate") class VersionGate: - """Maintains the set of acceptable commit hashes. + """Maintains the set of acceptable commit hashes via either a + Forgejo HTTP API call or a local `git log`. - Refresh strategy: lazy. Each call to ``check()`` consults the - cache; if the cache is older than ``cache_ttl_s``, re-runs - ``git log --format=%H -n `` from ``repo_path``. The - cache is process-shared (a single dict guarded by a lock), so - concurrent uvicorn workers don't all fork git on the same tick. - - `repo_path` should point at the maintainer's working clone on the - Pi (the one that gets `git pull`'d when new code lands). Default - is ``/home/max/cis490``; overridable via the receiver config so a - different operator/path works without code changes.""" + Args: + forgejo_url: e.g. "http://10.100.0.1:3000". Setting this enables + the Forgejo backend; ``repo_owner``/``repo_name``/``branch`` + must also be set. ``auth_token`` is optional but recommended + (so a private Forgejo doesn't need to be world-readable). + repo_path: local checkout (fallback / test backend). Used iff + ``forgejo_url`` is None. + window: how many recent commits count as valid. + cache_ttl_s: how long to trust the cache before refreshing. + """ def __init__( self, - repo_path: Path, + repo_path: Path | None = None, *, window: int = 100, cache_ttl_s: float = 5.0, + forgejo_url: str | None = None, + repo_owner: str | None = None, + repo_name: str | None = None, + branch: str = "main", + auth_token: str | None = None, ) -> None: - self.repo_path = Path(repo_path) + self.repo_path = Path(repo_path) if repo_path else None self.window = int(window) self.cache_ttl_s = float(cache_ttl_s) + self.forgejo_url = forgejo_url.rstrip("/") if forgejo_url else None + self.repo_owner = repo_owner + self.repo_name = repo_name + self.branch = branch + self.auth_token = auth_token + if not self.forgejo_url and not self.repo_path: + raise ValueError("VersionGate needs forgejo_url or repo_path") self._lock = threading.Lock() self._cached_hashes: frozenset[str] = frozenset() self._cached_at: float = 0.0 self._head: str | None = None + # ---- backend dispatch ----------------------------------------------- + def _refresh(self) -> None: - """Re-read git log. Falls back to whatever's cached on - subprocess error so a transient git issue doesn't lock out - every shipper at once.""" + if self.forgejo_url: + hashes, head = self._refresh_forgejo() + else: + hashes, head = self._refresh_git() + if not hashes: + log.warning("version-gate refresh empty; keeping prior cache " + "of %d hashes", len(self._cached_hashes)) + self._cached_at = time.monotonic() + return + with self._lock: + self._cached_hashes = frozenset(hashes) + self._cached_at = time.monotonic() + self._head = head + log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s", + len(hashes), head[:12] if head else "?", + "forgejo" if self.forgejo_url else "git") + + def _refresh_forgejo(self) -> tuple[set[str], str | None]: + """GET /api/v1/repos///commits?sha=&limit=.""" + url = ( + f"{self.forgejo_url}/api/v1/repos/{self.repo_owner}/" + f"{self.repo_name}/commits" + f"?sha={urllib.parse.quote(self.branch)}&limit={self.window}" + ) + req = urllib.request.Request(url) + if self.auth_token: + req.add_header("Authorization", f"token {self.auth_token}") + try: + with urllib.request.urlopen(req, timeout=3) as r: + rows = json.loads(r.read().decode("utf-8")) + except (urllib.request.HTTPError, urllib.request.URLError, + json.JSONDecodeError, OSError) as e: + log.warning("forgejo refresh failed (%s); keeping prior cache", e) + return set(), self._head + hashes: set[str] = set() + head: str | None = None + for i, row in enumerate(rows or []): + sha = row.get("sha") + if isinstance(sha, str) and len(sha) == 40: + sha = sha.lower() + hashes.add(sha) + if i == 0: + head = sha + return hashes, head + + def _refresh_git(self) -> tuple[set[str], str | None]: + """`git log -n --format=%H` from `repo_path`.""" try: out = subprocess.run( ["git", "-C", str(self.repo_path), @@ -63,24 +138,11 @@ class VersionGate: check=True, capture_output=True, text=True, timeout=3, ).stdout except (subprocess.SubprocessError, FileNotFoundError, OSError) as e: - log.warning("version-gate git refresh failed (%s); keeping prior cache " - "of %d hashes", e, len(self._cached_hashes)) - self._cached_at = time.monotonic() - return - hashes = {h.strip().lower() for h in out.splitlines() if h.strip()} - if not hashes: - log.warning("version-gate: git log returned empty; keeping prior cache") - self._cached_at = time.monotonic() - return - head = next(iter(hashes)) if len(hashes) == 1 else None - # First hash from `git log` IS the HEAD commit. - head = out.splitlines()[0].strip().lower() if out.splitlines() else None - with self._lock: - self._cached_hashes = frozenset(hashes) - self._cached_at = time.monotonic() - self._head = head - log.info("version-gate refreshed: %d valid hashes, head=%s", - len(hashes), head[:12] if head else "?") + log.warning("git refresh failed (%s); keeping prior cache", e) + return set(), self._head + lines = [h.strip().lower() for h in out.splitlines() if h.strip()] + head = lines[0] if lines else None + return set(lines), head def _maybe_refresh(self) -> None: if (time.monotonic() - self._cached_at) > self.cache_ttl_s: diff --git a/tests/test_version_gate.py b/tests/test_version_gate.py index 67cd74e..83dabc7 100644 --- a/tests/test_version_gate.py +++ b/tests/test_version_gate.py @@ -39,6 +39,47 @@ def _commits(repo: Path) -> list[str]: return _git(repo, "log", "--format=%H").splitlines() +def test_forgejo_backend_accepts_returned_commits(tmp_path: Path) -> None: + """Forgejo-backed gate hits a canned HTTP server returning a + commits list; the parser pulls sha + first-row-is-head.""" + import json as _json, threading as _t + from http.server import BaseHTTPRequestHandler, HTTPServer + HEAD = "abcdef0123456789" * 2 + "0" * 8 # 40 hex + OLD = "1234" * 10 + canned = _json.dumps([{"sha": HEAD}, {"sha": OLD}]).encode() + + class H(BaseHTTPRequestHandler): + def log_message(self, *a): pass + def do_GET(self): + assert "/api/v1/repos/spectral/CIS490/commits" in self.path + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(canned))) + self.end_headers() + self.wfile.write(canned) + + srv = HTTPServer(("127.0.0.1", 0), H) + port = srv.server_address[1] + th = _t.Thread(target=srv.serve_forever, daemon=True) + th.start() + try: + g = VersionGate( + forgejo_url=f"http://127.0.0.1:{port}", + repo_owner="spectral", repo_name="CIS490", branch="main", + window=50, cache_ttl_s=0, + ) + ok, _ = g.check(HEAD) + assert ok + assert g.head() == HEAD + ok, _ = g.check(OLD) + assert ok + ok, reason = g.check("0" * 40) + assert not ok and reason == "not-in-window" + finally: + srv.shutdown() + th.join(timeout=1) + + def test_check_accepts_head_commit(repo: Path) -> None: g = VersionGate(repo, window=10, cache_ttl_s=0) head = _commits(repo)[0]