version_gate: Forgejo as canonical commit source (no fs perms needed)

Initial git-log-based gate ran into a permission wall: the cis490
service user can't read /home/max/cis490/.git (ProtectHome=true +
home-dir mode). Switching the production source to the local Forgejo
HTTP API (already accessible to all WG peers, single source of truth
both lab hosts and the receiver pull from). When the maintainer
pushes new code to spectral/CIS490, the next 5-second cache refresh
sees the new commit and lab hosts can immediately ship under it.

VersionGate now takes either:
  - forgejo_url + repo_owner + repo_name + branch (+ optional
    auth_token for private repos): hits
    /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>
  - repo_path: dev-only fallback, runs `git log` locally

Local-git path retained for tests + the dev-only case.

receiver.toml.example gains forgejo_url/repo_owner/repo_name/branch
with auth_token commented; live-deployed receiver.toml on the Pi has
the spectral org + token.

Live state on the Pi: 41 valid hashes loaded, head=f8ad02b. Verified
end-to-end:
  bogus commit → 412 + remediation
  HEAD commit  → clears gate (fails downstream at sha-mismatch as
                 expected for the empty-body verify probe)

Test added: test_forgejo_backend_accepts_returned_commits stands up
a tiny canned-response HTTPServer in-process, exercises the parser
without depending on a live Forgejo instance. Brings test_version_gate
to 10 cases; total 158/158.
This commit is contained in:
max 2026-05-01 01:42:45 -05:00
parent f8ad02b2d7
commit cc0c96953e
5 changed files with 180 additions and 51 deletions

View file

@ -20,6 +20,18 @@ max_episode_bytes = 268_435_456 # 256 MiB
# `git pull` on the Pi makes new commits acceptable instantly). This
# keeps episodes from out-of-date lab hosts out of the index.
[version_gate]
enabled = true
repo_path = "/home/max/cis490"
window = 100
enabled = true
window = 100
# Production: hit the local Forgejo for the canonical commit list. The
# maintainer pushes to this repo; lab hosts pull from it. When the
# receiver checks each PUT it sees the same commits the lab hosts
# would see if they pulled at the same instant.
forgejo_url = "http://10.100.0.1:3000"
repo_owner = "spectral"
repo_name = "CIS490"
branch = "main"
# Optional Forgejo token for private repos; remove for public.
# auth_token = "..."
#
# Dev-only fallback (used iff forgejo_url is unset):
# local_repo_path = "/home/max/cis490"

View file

@ -35,8 +35,13 @@ def main() -> None:
version_gate = None
if cfg.version_gate_enabled:
version_gate = VersionGate(
repo_path=cfg.version_gate_repo,
repo_path=cfg.version_gate_local_repo,
window=cfg.version_gate_window,
forgejo_url=cfg.version_gate_forgejo_url,
repo_owner=cfg.version_gate_repo_owner,
repo_name=cfg.version_gate_repo_name,
branch=cfg.version_gate_branch,
auth_token=cfg.version_gate_auth_token,
)
app = make_app(
store=store,

View file

@ -17,12 +17,17 @@ class ReceiverConfig:
index_path: Path
max_episode_bytes: int
bearer_token: str | None
# Path to the maintainer's working clone — receiver consults its
# `git log` for the commit-allow-list. Default mirrors the
# canonical Pi setup.
version_gate_repo: Path
version_gate_window: int
# Code-version gate. Production source is the local Forgejo
# (canonical repo both lab hosts and the receiver pull from);
# local-git path is a dev-only fallback.
version_gate_enabled: bool
version_gate_window: int
version_gate_forgejo_url: str | None
version_gate_repo_owner: str | None
version_gate_repo_name: str | None
version_gate_branch: str
version_gate_auth_token: str | None
version_gate_local_repo: Path | None
@classmethod
def load(cls, path: str | Path) -> "ReceiverConfig":
@ -32,6 +37,7 @@ class ReceiverConfig:
listen_addr = data.get("listen_addr", "127.0.0.1:8443")
host, _, port = listen_addr.rpartition(":")
version_gate = data.get("version_gate", {})
local_repo = version_gate.get("local_repo_path")
return cls(
listen_host=host or "127.0.0.1",
listen_port=int(port),
@ -42,9 +48,12 @@ class ReceiverConfig:
data.get("limits", {}).get("max_episode_bytes", DEFAULT_MAX_EPISODE_BYTES)
),
bearer_token=data.get("auth", {}).get("bearer_token"),
version_gate_repo=Path(
version_gate.get("repo_path", "/home/max/cis490")
).resolve(),
version_gate_window=int(version_gate.get("window", 100)),
version_gate_enabled=bool(version_gate.get("enabled", True)),
version_gate_window=int(version_gate.get("window", 100)),
version_gate_forgejo_url=version_gate.get("forgejo_url"),
version_gate_repo_owner=version_gate.get("repo_owner"),
version_gate_repo_name=version_gate.get("repo_name"),
version_gate_branch=version_gate.get("branch", "main"),
version_gate_auth_token=version_gate.get("auth_token"),
version_gate_local_repo=Path(local_repo).resolve() if local_repo else None,
)

View file

@ -1,10 +1,23 @@
"""Live commit allow-list for the receiver.
The receiver only stores episodes whose `meta.json::code_version.commit`
matches a commit in the maintainer's local repository on the Pi. The
allow-list is "the last N commits to the active branch" auto-refreshed
from `git log` so a `git pull` (or push) on the Pi makes the new commit
acceptable without a service restart.
matches a commit in the canonical repository's recent history. Two
backends are supported:
forgejo: queries
GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>
on a Forgejo instance the maintainer pushes to. PRODUCTION
DEFAULT Forgejo is the authoritative source of truth that
both lab hosts and the receiver pull from, so when the
maintainer pushes new code the new commit becomes acceptable
automatically.
git: runs `git log -n <window> --format=%H` against a local
checkout. Used by tests + dev-only setups where a Forgejo
instance isn't available.
Cache TTL: 5s by default push a commit, wait 5s, the new hash is
in the allow-list. No service restart.
Episodes from older code (before a known bug fix) get rejected with
HTTP 412 + a remediation block telling the lab-host operator to pull
@ -13,10 +26,13 @@ main and re-run the install. That keeps bad data out of the index.
from __future__ import annotations
import json
import logging
import subprocess
import threading
import time
import urllib.parse
import urllib.request
from pathlib import Path
@ -24,38 +40,97 @@ log = logging.getLogger("cis490.receiver.version_gate")
class VersionGate:
"""Maintains the set of acceptable commit hashes.
"""Maintains the set of acceptable commit hashes via either a
Forgejo HTTP API call or a local `git log`.
Refresh strategy: lazy. Each call to ``check()`` consults the
cache; if the cache is older than ``cache_ttl_s``, re-runs
``git log --format=%H -n <window>`` from ``repo_path``. The
cache is process-shared (a single dict guarded by a lock), so
concurrent uvicorn workers don't all fork git on the same tick.
`repo_path` should point at the maintainer's working clone on the
Pi (the one that gets `git pull`'d when new code lands). Default
is ``/home/max/cis490``; overridable via the receiver config so a
different operator/path works without code changes."""
Args:
forgejo_url: e.g. "http://10.100.0.1:3000". Setting this enables
the Forgejo backend; ``repo_owner``/``repo_name``/``branch``
must also be set. ``auth_token`` is optional but recommended
(so a private Forgejo doesn't need to be world-readable).
repo_path: local checkout (fallback / test backend). Used iff
``forgejo_url`` is None.
window: how many recent commits count as valid.
cache_ttl_s: how long to trust the cache before refreshing.
"""
def __init__(
self,
repo_path: Path,
repo_path: Path | None = None,
*,
window: int = 100,
cache_ttl_s: float = 5.0,
forgejo_url: str | None = None,
repo_owner: str | None = None,
repo_name: str | None = None,
branch: str = "main",
auth_token: str | None = None,
) -> None:
self.repo_path = Path(repo_path)
self.repo_path = Path(repo_path) if repo_path else None
self.window = int(window)
self.cache_ttl_s = float(cache_ttl_s)
self.forgejo_url = forgejo_url.rstrip("/") if forgejo_url else None
self.repo_owner = repo_owner
self.repo_name = repo_name
self.branch = branch
self.auth_token = auth_token
if not self.forgejo_url and not self.repo_path:
raise ValueError("VersionGate needs forgejo_url or repo_path")
self._lock = threading.Lock()
self._cached_hashes: frozenset[str] = frozenset()
self._cached_at: float = 0.0
self._head: str | None = None
# ---- backend dispatch -----------------------------------------------
def _refresh(self) -> None:
"""Re-read git log. Falls back to whatever's cached on
subprocess error so a transient git issue doesn't lock out
every shipper at once."""
if self.forgejo_url:
hashes, head = self._refresh_forgejo()
else:
hashes, head = self._refresh_git()
if not hashes:
log.warning("version-gate refresh empty; keeping prior cache "
"of %d hashes", len(self._cached_hashes))
self._cached_at = time.monotonic()
return
with self._lock:
self._cached_hashes = frozenset(hashes)
self._cached_at = time.monotonic()
self._head = head
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
len(hashes), head[:12] if head else "?",
"forgejo" if self.forgejo_url else "git")
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""
url = (
f"{self.forgejo_url}/api/v1/repos/{self.repo_owner}/"
f"{self.repo_name}/commits"
f"?sha={urllib.parse.quote(self.branch)}&limit={self.window}"
)
req = urllib.request.Request(url)
if self.auth_token:
req.add_header("Authorization", f"token {self.auth_token}")
try:
with urllib.request.urlopen(req, timeout=3) as r:
rows = json.loads(r.read().decode("utf-8"))
except (urllib.request.HTTPError, urllib.request.URLError,
json.JSONDecodeError, OSError) as e:
log.warning("forgejo refresh failed (%s); keeping prior cache", e)
return set(), self._head
hashes: set[str] = set()
head: str | None = None
for i, row in enumerate(rows or []):
sha = row.get("sha")
if isinstance(sha, str) and len(sha) == 40:
sha = sha.lower()
hashes.add(sha)
if i == 0:
head = sha
return hashes, head
def _refresh_git(self) -> tuple[set[str], str | None]:
"""`git log -n <window> --format=%H` from `repo_path`."""
try:
out = subprocess.run(
["git", "-C", str(self.repo_path),
@ -63,24 +138,11 @@ class VersionGate:
check=True, capture_output=True, text=True, timeout=3,
).stdout
except (subprocess.SubprocessError, FileNotFoundError, OSError) as e:
log.warning("version-gate git refresh failed (%s); keeping prior cache "
"of %d hashes", e, len(self._cached_hashes))
self._cached_at = time.monotonic()
return
hashes = {h.strip().lower() for h in out.splitlines() if h.strip()}
if not hashes:
log.warning("version-gate: git log returned empty; keeping prior cache")
self._cached_at = time.monotonic()
return
head = next(iter(hashes)) if len(hashes) == 1 else None
# First hash from `git log` IS the HEAD commit.
head = out.splitlines()[0].strip().lower() if out.splitlines() else None
with self._lock:
self._cached_hashes = frozenset(hashes)
self._cached_at = time.monotonic()
self._head = head
log.info("version-gate refreshed: %d valid hashes, head=%s",
len(hashes), head[:12] if head else "?")
log.warning("git refresh failed (%s); keeping prior cache", e)
return set(), self._head
lines = [h.strip().lower() for h in out.splitlines() if h.strip()]
head = lines[0] if lines else None
return set(lines), head
def _maybe_refresh(self) -> None:
if (time.monotonic() - self._cached_at) > self.cache_ttl_s:

View file

@ -39,6 +39,47 @@ def _commits(repo: Path) -> list[str]:
return _git(repo, "log", "--format=%H").splitlines()
def test_forgejo_backend_accepts_returned_commits(tmp_path: Path) -> None:
"""Forgejo-backed gate hits a canned HTTP server returning a
commits list; the parser pulls sha + first-row-is-head."""
import json as _json, threading as _t
from http.server import BaseHTTPRequestHandler, HTTPServer
HEAD = "abcdef0123456789" * 2 + "0" * 8 # 40 hex
OLD = "1234" * 10
canned = _json.dumps([{"sha": HEAD}, {"sha": OLD}]).encode()
class H(BaseHTTPRequestHandler):
def log_message(self, *a): pass
def do_GET(self):
assert "/api/v1/repos/spectral/CIS490/commits" in self.path
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(canned)))
self.end_headers()
self.wfile.write(canned)
srv = HTTPServer(("127.0.0.1", 0), H)
port = srv.server_address[1]
th = _t.Thread(target=srv.serve_forever, daemon=True)
th.start()
try:
g = VersionGate(
forgejo_url=f"http://127.0.0.1:{port}",
repo_owner="spectral", repo_name="CIS490", branch="main",
window=50, cache_ttl_s=0,
)
ok, _ = g.check(HEAD)
assert ok
assert g.head() == HEAD
ok, _ = g.check(OLD)
assert ok
ok, reason = g.check("0" * 40)
assert not ok and reason == "not-in-window"
finally:
srv.shutdown()
th.join(timeout=1)
def test_check_accepts_head_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
head = _commits(repo)[0]