Two follow-ups from the post-cutover diagnosis: 1. version_gate: forgejo → local git fallback. If forgejo refresh returns empty AND a local repo path is configured, retry against `git log` from the local checkout. The receiver service runs on the same Pi as forgejo, so a simultaneous restart used to leave the gate's cache empty and reject every PUT with not-in-window. Auto-detects /opt/cis490/.git when the operator hasn't set local_repo_path explicitly — that path is always present on a production receiver and ProtectSystem=strict still allows reads. Logs `source=git-fallback` so this isn't silent. 2. shipper/queue: sweep orphaned outbox tarballs. The lifecycle invariant is `outbox/<id>.tar.zst exists ⇒ episodes/<id>/ exists` — broken historically by the now-fixed fatal-loop, by operator `rm` of an episode dir, or by an OS crash between rename(2) and the post-ship cleanup. Without sweeping, dead bytes pile up forever. New _sweep_outbox runs at the start of every scan, bounded by the file count in outbox/. Tests cover: fallback fires when forgejo unreachable + repo_path set; no fallback when repo_path None (opt-in); orphan tarball + partial get swept on the next pass; live tarballs untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
309 lines
11 KiB
Python
309 lines
11 KiB
Python
"""Tests for the receiver's commit-allow-list gate.
|
|
|
|
The gate refreshes the allow-list from `git log` of a configured
|
|
repo path. Tests use real git operations on a temp repo so we
|
|
exercise the same subprocess code paths the receiver does in
|
|
production.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from receiver.version_gate import VersionGate
|
|
|
|
|
|
def _git(cwd: Path, *args: str) -> str:
|
|
return subprocess.check_output(
|
|
["git", "-c", "user.email=t@t", "-c", "user.name=t",
|
|
"-C", str(cwd), *args],
|
|
text=True,
|
|
).strip()
|
|
|
|
|
|
@pytest.fixture
|
|
def repo(tmp_path: Path) -> Path:
|
|
r = tmp_path / "repo"
|
|
r.mkdir()
|
|
_git(r, "init", "--initial-branch=main")
|
|
(r / "f").write_text("v1")
|
|
_git(r, "add", "f")
|
|
_git(r, "commit", "-m", "v1")
|
|
return r
|
|
|
|
|
|
def _commits(repo: Path) -> list[str]:
|
|
return _git(repo, "log", "--format=%H").splitlines()
|
|
|
|
|
|
def test_forgejo_backend_accepts_returned_commits(tmp_path: Path) -> None:
|
|
"""Forgejo-backed gate hits a canned HTTP server returning a
|
|
commits list; the parser pulls sha + first-row-is-head."""
|
|
import json as _json, threading as _t
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
HEAD = "abcdef0123456789" * 2 + "0" * 8 # 40 hex
|
|
OLD = "1234" * 10
|
|
canned = _json.dumps([{"sha": HEAD}, {"sha": OLD}]).encode()
|
|
|
|
class H(BaseHTTPRequestHandler):
|
|
def log_message(self, *a): pass
|
|
def do_GET(self):
|
|
assert "/api/v1/repos/spectral/CIS490/commits" in self.path
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(canned)))
|
|
self.end_headers()
|
|
self.wfile.write(canned)
|
|
|
|
srv = HTTPServer(("127.0.0.1", 0), H)
|
|
port = srv.server_address[1]
|
|
th = _t.Thread(target=srv.serve_forever, daemon=True)
|
|
th.start()
|
|
try:
|
|
g = VersionGate(
|
|
forgejo_url=f"http://127.0.0.1:{port}",
|
|
repo_owner="spectral", repo_name="CIS490", branch="main",
|
|
window=50, cache_ttl_s=0,
|
|
)
|
|
ok, _ = g.check(HEAD)
|
|
assert ok
|
|
assert g.head() == HEAD
|
|
ok, _ = g.check(OLD)
|
|
assert ok
|
|
ok, reason = g.check("0" * 40)
|
|
assert not ok and reason == "not-in-window"
|
|
finally:
|
|
srv.shutdown()
|
|
th.join(timeout=1)
|
|
|
|
|
|
def test_check_accepts_head_commit(repo: Path) -> None:
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
head = _commits(repo)[0]
|
|
ok, reason = g.check(head)
|
|
assert ok and reason is None
|
|
assert g.head() == head
|
|
|
|
|
|
def test_check_rejects_unknown_commit(repo: Path) -> None:
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
ok, reason = g.check("0" * 40)
|
|
assert not ok and reason == "not-in-window"
|
|
|
|
|
|
def test_check_rejects_missing_commit(repo: Path) -> None:
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
ok, reason = g.check(None)
|
|
assert not ok and reason == "missing"
|
|
ok, reason = g.check("")
|
|
assert not ok and reason == "missing"
|
|
|
|
|
|
def test_check_rejects_bad_format(repo: Path) -> None:
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
ok, reason = g.check("not-a-hash")
|
|
assert not ok and reason == "bad-format"
|
|
ok, reason = g.check("ABCDEF") # too short, but valid hex
|
|
assert not ok and reason == "bad-format"
|
|
|
|
|
|
def test_new_commit_after_pull_is_accepted_within_ttl(repo: Path) -> None:
|
|
"""The whole point: when the maintainer commits new code on the
|
|
Pi, the receiver picks it up automatically without restart."""
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
# Add a new commit AFTER gate is constructed.
|
|
(repo / "f").write_text("v2")
|
|
_git(repo, "commit", "-am", "v2")
|
|
new_head = _commits(repo)[0]
|
|
# cache_ttl_s=0 forces refresh on next check.
|
|
ok, _ = g.check(new_head)
|
|
assert ok
|
|
assert g.head() == new_head
|
|
|
|
|
|
def test_window_limits_history(repo: Path) -> None:
|
|
"""Old commits past the window should drop out of the allow-list."""
|
|
# Add 5 more commits.
|
|
for i in range(2, 7):
|
|
(repo / "f").write_text(f"v{i}")
|
|
_git(repo, "commit", "-am", f"v{i}")
|
|
all_commits = _commits(repo)
|
|
assert len(all_commits) == 6
|
|
g = VersionGate(repo, window=3, cache_ttl_s=0)
|
|
# Top 3 are valid.
|
|
for c in all_commits[:3]:
|
|
ok, _ = g.check(c)
|
|
assert ok, f"{c[:8]} should be in window"
|
|
# Older 3 are not.
|
|
for c in all_commits[3:]:
|
|
ok, reason = g.check(c)
|
|
assert not ok and reason == "not-in-window"
|
|
|
|
|
|
def test_e2e_receiver_returns_412_for_unknown_commit(repo: Path, tmp_path: Path) -> None:
|
|
"""End-to-end: PUT with an out-of-window commit returns 412 with
|
|
the remediation block, and the tarball does NOT land on disk."""
|
|
import io as _io, json as _json, tarfile as _tar, hashlib as _h
|
|
from starlette.testclient import TestClient
|
|
from receiver.app import make_app
|
|
from receiver.store import EpisodeStore
|
|
|
|
head = _commits(repo)[0]
|
|
rcv_root = tmp_path / "rcv"
|
|
store = EpisodeStore(
|
|
store_root=rcv_root / "ep",
|
|
incoming_root=rcv_root / "in",
|
|
index_path=rcv_root / "index.jsonl",
|
|
)
|
|
gate = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
app = make_app(store=store, max_episode_bytes=10_000_000,
|
|
bearer_token=None, version_gate=gate)
|
|
|
|
# Build a tiny valid tarball.
|
|
raw = _io.BytesIO()
|
|
with _tar.open(fileobj=raw, mode="w") as t:
|
|
info = _tar.TarInfo("01TEST/meta.json")
|
|
body = b"{}"
|
|
info.size = len(body)
|
|
t.addfile(info, _io.BytesIO(body))
|
|
payload = raw.getvalue()
|
|
sha = _h.sha256(payload).hexdigest()
|
|
|
|
with TestClient(app) as client:
|
|
# Wrong commit: rejected with 412 + remediation in body.
|
|
bad = "0" * 40
|
|
r = client.put(
|
|
f"/v1/episodes/lab1/01TEST.tar.zst",
|
|
content=payload,
|
|
headers={
|
|
"X-Content-SHA256": sha,
|
|
"X-Lab-Host": "lab1",
|
|
"X-Cis490-Code-Commit": bad,
|
|
},
|
|
)
|
|
assert r.status_code == 412
|
|
body = r.json()
|
|
assert "remediation" in body
|
|
assert body["your_commit"] == bad
|
|
assert body["head_commit"] == head
|
|
# Index must NOT have grown.
|
|
assert store.index_path.read_text() == ""
|
|
|
|
# Right commit: accepted (201).
|
|
r = client.put(
|
|
f"/v1/episodes/lab1/01TEST.tar.zst",
|
|
content=payload,
|
|
headers={
|
|
"X-Content-SHA256": sha,
|
|
"X-Lab-Host": "lab1",
|
|
"X-Cis490-Code-Commit": head,
|
|
},
|
|
)
|
|
assert r.status_code == 201, r.text
|
|
# Index gained one row stamped with the commit.
|
|
rows = [_json.loads(l) for l in store.index_path.read_text().splitlines() if l.strip()]
|
|
assert len(rows) == 1
|
|
assert rows[0]["commit"] == head
|
|
|
|
|
|
def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_path: Path) -> None:
|
|
"""Missing header is a client bug (lab host pre-stamp-update);
|
|
receiver returns 400 with remediation."""
|
|
import io as _io, tarfile as _tar, hashlib as _h
|
|
from starlette.testclient import TestClient
|
|
from receiver.app import make_app
|
|
from receiver.store import EpisodeStore
|
|
|
|
rcv_root = tmp_path / "rcv"
|
|
store = EpisodeStore(
|
|
store_root=rcv_root / "ep",
|
|
incoming_root=rcv_root / "in",
|
|
index_path=rcv_root / "index.jsonl",
|
|
)
|
|
gate = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
app = make_app(store=store, max_episode_bytes=10_000_000,
|
|
bearer_token=None, version_gate=gate)
|
|
|
|
raw = _io.BytesIO()
|
|
with _tar.open(fileobj=raw, mode="w") as t:
|
|
info = _tar.TarInfo("01TEST/meta.json")
|
|
info.size = 2
|
|
t.addfile(info, _io.BytesIO(b"{}"))
|
|
payload = raw.getvalue()
|
|
sha = _h.sha256(payload).hexdigest()
|
|
with TestClient(app) as client:
|
|
r = client.put(
|
|
f"/v1/episodes/lab1/01TEST.tar.zst",
|
|
content=payload,
|
|
headers={
|
|
"X-Content-SHA256": sha,
|
|
"X-Lab-Host": "lab1",
|
|
# no X-Cis490-Code-Commit
|
|
},
|
|
)
|
|
assert r.status_code == 400
|
|
assert "missing" in r.json()["error"].lower()
|
|
|
|
|
|
def test_forgejo_falls_back_to_local_git_when_unreachable(repo: Path) -> None:
|
|
"""If forgejo is unreachable AND a local repo path is configured,
|
|
the gate must fall back to git so the receiver doesn't reject
|
|
every PUT during a forgejo blip (e.g. the receiver and forgejo
|
|
restarting together on the same Pi). Without the fallback, the
|
|
cache stays empty and `check()` returns not-in-window for
|
|
everything — breaking all lab hosts simultaneously even though
|
|
their commits are perfectly valid."""
|
|
# Pick a port nothing's bound to. urllib will fail-fast.
|
|
import socket as _s
|
|
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
|
|
sock.bind(("127.0.0.1", 0))
|
|
dead_port = sock.getsockname()[1]
|
|
g = VersionGate(
|
|
repo_path=repo,
|
|
forgejo_url=f"http://127.0.0.1:{dead_port}",
|
|
repo_owner="spectral", repo_name="CIS490", branch="main",
|
|
window=50, cache_ttl_s=0,
|
|
)
|
|
head = _commits(repo)[0]
|
|
# Forgejo is unreachable, but the local repo has the commit.
|
|
# check() must still return ok.
|
|
ok, reason = g.check(head)
|
|
assert ok, f"fallback didn't trigger: reason={reason}, head={head}"
|
|
assert g.valid_count() >= 1
|
|
|
|
|
|
def test_forgejo_no_fallback_without_local_repo(repo: Path) -> None:
|
|
"""Without a local repo configured, a forgejo blip is fatal — the
|
|
gate has nothing to fall back to and returns not-in-window. This
|
|
confirms the fallback is opt-in via repo_path, not a free side
|
|
effect of the forgejo backend."""
|
|
import socket as _s
|
|
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
|
|
sock.bind(("127.0.0.1", 0))
|
|
dead_port = sock.getsockname()[1]
|
|
g = VersionGate(
|
|
forgejo_url=f"http://127.0.0.1:{dead_port}",
|
|
repo_owner="spectral", repo_name="CIS490", branch="main",
|
|
window=50, cache_ttl_s=0,
|
|
)
|
|
ok, reason = g.check("a" * 40)
|
|
assert not ok and reason == "not-in-window"
|
|
assert g.valid_count() == 0
|
|
|
|
|
|
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
|
|
"""If the maintainer's clone disappears (or git fails), the gate
|
|
keeps its last-known allow-list — better than locking out every
|
|
shipper at once."""
|
|
g = VersionGate(repo, window=10, cache_ttl_s=0)
|
|
head = _commits(repo)[0]
|
|
ok, _ = g.check(head)
|
|
assert ok
|
|
# Now break the repo path.
|
|
g.repo_path = repo / "does-not-exist"
|
|
# Cache should still serve the previously-known head.
|
|
ok, _ = g.check(head)
|
|
assert ok
|