CIS490/tests/test_version_gate.py
max cc0c96953e version_gate: Forgejo as canonical commit source (no fs perms needed)
Initial git-log-based gate ran into a permission wall: the cis490
service user can't read /home/max/cis490/.git (ProtectHome=true +
home-dir mode). Switching the production source to the local Forgejo
HTTP API (already accessible to all WG peers, single source of truth
both lab hosts and the receiver pull from). When the maintainer
pushes new code to spectral/CIS490, the next 5-second cache refresh
sees the new commit and lab hosts can immediately ship under it.

VersionGate now takes either:
  - forgejo_url + repo_owner + repo_name + branch (+ optional
    auth_token for private repos): hits
    /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>
  - repo_path: dev-only fallback, runs `git log` locally

Local-git path retained for tests + the dev-only case.

receiver.toml.example gains forgejo_url/repo_owner/repo_name/branch
with auth_token commented; live-deployed receiver.toml on the Pi has
the spectral org + token.

Live state on the Pi: 41 valid hashes loaded, head=f8ad02b. Verified
end-to-end:
  bogus commit → 412 + remediation
  HEAD commit  → clears gate (fails downstream at sha-mismatch as
                 expected for the empty-body verify probe)

Test added: test_forgejo_backend_accepts_returned_commits stands up
a tiny canned-response HTTPServer in-process, exercises the parser
without depending on a live Forgejo instance. Brings test_version_gate
to 10 cases; total 158/158.
2026-05-01 01:42:45 -05:00

263 lines
8.7 KiB
Python

"""Tests for the receiver's commit-allow-list gate.
The gate refreshes the allow-list from `git log` of a configured
repo path. Tests use real git operations on a temp repo so we
exercise the same subprocess code paths the receiver does in
production.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
import pytest
from receiver.version_gate import VersionGate
def _git(cwd: Path, *args: str) -> str:
return subprocess.check_output(
["git", "-c", "user.email=t@t", "-c", "user.name=t",
"-C", str(cwd), *args],
text=True,
).strip()
@pytest.fixture
def repo(tmp_path: Path) -> Path:
r = tmp_path / "repo"
r.mkdir()
_git(r, "init", "--initial-branch=main")
(r / "f").write_text("v1")
_git(r, "add", "f")
_git(r, "commit", "-m", "v1")
return r
def _commits(repo: Path) -> list[str]:
return _git(repo, "log", "--format=%H").splitlines()
def test_forgejo_backend_accepts_returned_commits(tmp_path: Path) -> None:
"""Forgejo-backed gate hits a canned HTTP server returning a
commits list; the parser pulls sha + first-row-is-head."""
import json as _json, threading as _t
from http.server import BaseHTTPRequestHandler, HTTPServer
HEAD = "abcdef0123456789" * 2 + "0" * 8 # 40 hex
OLD = "1234" * 10
canned = _json.dumps([{"sha": HEAD}, {"sha": OLD}]).encode()
class H(BaseHTTPRequestHandler):
def log_message(self, *a): pass
def do_GET(self):
assert "/api/v1/repos/spectral/CIS490/commits" in self.path
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(canned)))
self.end_headers()
self.wfile.write(canned)
srv = HTTPServer(("127.0.0.1", 0), H)
port = srv.server_address[1]
th = _t.Thread(target=srv.serve_forever, daemon=True)
th.start()
try:
g = VersionGate(
forgejo_url=f"http://127.0.0.1:{port}",
repo_owner="spectral", repo_name="CIS490", branch="main",
window=50, cache_ttl_s=0,
)
ok, _ = g.check(HEAD)
assert ok
assert g.head() == HEAD
ok, _ = g.check(OLD)
assert ok
ok, reason = g.check("0" * 40)
assert not ok and reason == "not-in-window"
finally:
srv.shutdown()
th.join(timeout=1)
def test_check_accepts_head_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
head = _commits(repo)[0]
ok, reason = g.check(head)
assert ok and reason is None
assert g.head() == head
def test_check_rejects_unknown_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check("0" * 40)
assert not ok and reason == "not-in-window"
def test_check_rejects_missing_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check(None)
assert not ok and reason == "missing"
ok, reason = g.check("")
assert not ok and reason == "missing"
def test_check_rejects_bad_format(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check("not-a-hash")
assert not ok and reason == "bad-format"
ok, reason = g.check("ABCDEF") # too short, but valid hex
assert not ok and reason == "bad-format"
def test_new_commit_after_pull_is_accepted_within_ttl(repo: Path) -> None:
"""The whole point: when the maintainer commits new code on the
Pi, the receiver picks it up automatically without restart."""
g = VersionGate(repo, window=10, cache_ttl_s=0)
# Add a new commit AFTER gate is constructed.
(repo / "f").write_text("v2")
_git(repo, "commit", "-am", "v2")
new_head = _commits(repo)[0]
# cache_ttl_s=0 forces refresh on next check.
ok, _ = g.check(new_head)
assert ok
assert g.head() == new_head
def test_window_limits_history(repo: Path) -> None:
"""Old commits past the window should drop out of the allow-list."""
# Add 5 more commits.
for i in range(2, 7):
(repo / "f").write_text(f"v{i}")
_git(repo, "commit", "-am", f"v{i}")
all_commits = _commits(repo)
assert len(all_commits) == 6
g = VersionGate(repo, window=3, cache_ttl_s=0)
# Top 3 are valid.
for c in all_commits[:3]:
ok, _ = g.check(c)
assert ok, f"{c[:8]} should be in window"
# Older 3 are not.
for c in all_commits[3:]:
ok, reason = g.check(c)
assert not ok and reason == "not-in-window"
def test_e2e_receiver_returns_412_for_unknown_commit(repo: Path, tmp_path: Path) -> None:
"""End-to-end: PUT with an out-of-window commit returns 412 with
the remediation block, and the tarball does NOT land on disk."""
import io as _io, json as _json, tarfile as _tar, hashlib as _h
from starlette.testclient import TestClient
from receiver.app import make_app
from receiver.store import EpisodeStore
head = _commits(repo)[0]
rcv_root = tmp_path / "rcv"
store = EpisodeStore(
store_root=rcv_root / "ep",
incoming_root=rcv_root / "in",
index_path=rcv_root / "index.jsonl",
)
gate = VersionGate(repo, window=10, cache_ttl_s=0)
app = make_app(store=store, max_episode_bytes=10_000_000,
bearer_token=None, version_gate=gate)
# Build a tiny valid tarball.
raw = _io.BytesIO()
with _tar.open(fileobj=raw, mode="w") as t:
info = _tar.TarInfo("01TEST/meta.json")
body = b"{}"
info.size = len(body)
t.addfile(info, _io.BytesIO(body))
payload = raw.getvalue()
sha = _h.sha256(payload).hexdigest()
with TestClient(app) as client:
# Wrong commit: rejected with 412 + remediation in body.
bad = "0" * 40
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
"X-Cis490-Code-Commit": bad,
},
)
assert r.status_code == 412
body = r.json()
assert "remediation" in body
assert body["your_commit"] == bad
assert body["head_commit"] == head
# Index must NOT have grown.
assert store.index_path.read_text() == ""
# Right commit: accepted (201).
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
"X-Cis490-Code-Commit": head,
},
)
assert r.status_code == 201, r.text
# Index gained one row stamped with the commit.
rows = [_json.loads(l) for l in store.index_path.read_text().splitlines() if l.strip()]
assert len(rows) == 1
assert rows[0]["commit"] == head
def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_path: Path) -> None:
"""Missing header is a client bug (lab host pre-stamp-update);
receiver returns 400 with remediation."""
import io as _io, tarfile as _tar, hashlib as _h
from starlette.testclient import TestClient
from receiver.app import make_app
from receiver.store import EpisodeStore
rcv_root = tmp_path / "rcv"
store = EpisodeStore(
store_root=rcv_root / "ep",
incoming_root=rcv_root / "in",
index_path=rcv_root / "index.jsonl",
)
gate = VersionGate(repo, window=10, cache_ttl_s=0)
app = make_app(store=store, max_episode_bytes=10_000_000,
bearer_token=None, version_gate=gate)
raw = _io.BytesIO()
with _tar.open(fileobj=raw, mode="w") as t:
info = _tar.TarInfo("01TEST/meta.json")
info.size = 2
t.addfile(info, _io.BytesIO(b"{}"))
payload = raw.getvalue()
sha = _h.sha256(payload).hexdigest()
with TestClient(app) as client:
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
# no X-Cis490-Code-Commit
},
)
assert r.status_code == 400
assert "missing" in r.json()["error"].lower()
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
"""If the maintainer's clone disappears (or git fails), the gate
keeps its last-known allow-list — better than locking out every
shipper at once."""
g = VersionGate(repo, window=10, cache_ttl_s=0)
head = _commits(repo)[0]
ok, _ = g.check(head)
assert ok
# Now break the repo path.
g.repo_path = repo / "does-not-exist"
# Cache should still serve the previously-known head.
ok, _ = g.check(head)
assert ok