robustness: gate falls back to local git, queue sweeps stale tarballs
Two follow-ups from the post-cutover diagnosis: 1. version_gate: forgejo → local git fallback. If forgejo refresh returns empty AND a local repo path is configured, retry against `git log` from the local checkout. The receiver service runs on the same Pi as forgejo, so a simultaneous restart used to leave the gate's cache empty and reject every PUT with not-in-window. Auto-detects /opt/cis490/.git when the operator hasn't set local_repo_path explicitly — that path is always present on a production receiver and ProtectSystem=strict still allows reads. Logs `source=git-fallback` so this isn't silent. 2. shipper/queue: sweep orphaned outbox tarballs. The lifecycle invariant is `outbox/<id>.tar.zst exists ⇒ episodes/<id>/ exists` — broken historically by the now-fixed fatal-loop, by operator `rm` of an episode dir, or by an OS crash between rename(2) and the post-ship cleanup. Without sweeping, dead bytes pile up forever. New _sweep_outbox runs at the start of every scan, bounded by the file count in outbox/. Tests cover: fallback fires when forgejo unreachable + repo_path set; no fallback when repo_path None (opt-in); orphan tarball + partial get swept on the next pass; live tarballs untouched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
f294e97875
commit
5cebe7096a
5 changed files with 138 additions and 3 deletions
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
|
|
@ -34,8 +35,16 @@ def main() -> None:
|
||||||
)
|
)
|
||||||
version_gate = None
|
version_gate = None
|
||||||
if cfg.version_gate_enabled:
|
if cfg.version_gate_enabled:
|
||||||
|
# Auto-detect /opt/cis490/.git as a fallback so a forgejo blip
|
||||||
|
# at startup doesn't reject every PUT with not-in-window. The
|
||||||
|
# receiver service has read access to /opt under
|
||||||
|
# ProtectSystem=strict, and that path is where the production
|
||||||
|
# install lands — so it's the natural local source of truth.
|
||||||
|
repo_path = cfg.version_gate_local_repo
|
||||||
|
if repo_path is None and Path("/opt/cis490/.git").is_dir():
|
||||||
|
repo_path = Path("/opt/cis490")
|
||||||
version_gate = VersionGate(
|
version_gate = VersionGate(
|
||||||
repo_path=cfg.version_gate_local_repo,
|
repo_path=repo_path,
|
||||||
window=cfg.version_gate_window,
|
window=cfg.version_gate_window,
|
||||||
forgejo_url=cfg.version_gate_forgejo_url,
|
forgejo_url=cfg.version_gate_forgejo_url,
|
||||||
repo_owner=cfg.version_gate_repo_owner,
|
repo_owner=cfg.version_gate_repo_owner,
|
||||||
|
|
|
||||||
|
|
@ -84,8 +84,22 @@ class VersionGate:
|
||||||
# ---- backend dispatch -----------------------------------------------
|
# ---- backend dispatch -----------------------------------------------
|
||||||
|
|
||||||
def _refresh(self) -> None:
|
def _refresh(self) -> None:
|
||||||
|
# Try forgejo first when configured. If it fails AND we also
|
||||||
|
# have a local repo path, fall back to git — this lets the
|
||||||
|
# receiver keep working through a forgejo blip (e.g. both
|
||||||
|
# services restarting on the same host) without flat-rejecting
|
||||||
|
# every PUT with not-in-window. The local repo on the
|
||||||
|
# receiver's own host is the closest thing to ground truth we
|
||||||
|
# have when the canonical source is unreachable.
|
||||||
|
source = "forgejo" if self.forgejo_url else "git"
|
||||||
if self.forgejo_url:
|
if self.forgejo_url:
|
||||||
hashes, head = self._refresh_forgejo()
|
hashes, head = self._refresh_forgejo()
|
||||||
|
if not hashes and self.repo_path is not None:
|
||||||
|
log.info("forgejo refresh empty; falling back to local git "
|
||||||
|
"at %s", self.repo_path)
|
||||||
|
hashes, head = self._refresh_git()
|
||||||
|
if hashes:
|
||||||
|
source = "git-fallback"
|
||||||
else:
|
else:
|
||||||
hashes, head = self._refresh_git()
|
hashes, head = self._refresh_git()
|
||||||
if not hashes:
|
if not hashes:
|
||||||
|
|
@ -98,8 +112,7 @@ class VersionGate:
|
||||||
self._cached_at = time.monotonic()
|
self._cached_at = time.monotonic()
|
||||||
self._head = head
|
self._head = head
|
||||||
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
|
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
|
||||||
len(hashes), head[:12] if head else "?",
|
len(hashes), head[:12] if head else "?", source)
|
||||||
"forgejo" if self.forgejo_url else "git")
|
|
||||||
|
|
||||||
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
|
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
|
||||||
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""
|
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,13 @@ class ShipperQueue:
|
||||||
|
|
||||||
def run_once(self) -> PassResult:
|
def run_once(self) -> PassResult:
|
||||||
"""One scan pass. Returns counts for logging / tests."""
|
"""One scan pass. Returns counts for logging / tests."""
|
||||||
|
# Sweep stale outbox tarballs first. Normal lifecycle has the
|
||||||
|
# tarball deleted alongside retire / quarantine, but operator
|
||||||
|
# intervention (rm of an episode dir) or a crash between
|
||||||
|
# rename(2) and the post-ship cleanup can leave one orphaned.
|
||||||
|
# Bounded by the number of files in outbox/, so cheap to do
|
||||||
|
# every pass.
|
||||||
|
self._sweep_outbox()
|
||||||
ready = self._ready_episodes()
|
ready = self._ready_episodes()
|
||||||
scanned = len(ready)
|
scanned = len(ready)
|
||||||
shipped = 0
|
shipped = 0
|
||||||
|
|
@ -156,6 +163,34 @@ class ShipperQueue:
|
||||||
out.append(ep)
|
out.append(ep)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
def _sweep_outbox(self) -> None:
|
||||||
|
"""Delete tarballs in outbox/ that have no matching episode dir.
|
||||||
|
|
||||||
|
The invariant the queue maintains: ``outbox/<id>.tar.zst``
|
||||||
|
only exists while ``episodes/<id>/`` is also present.
|
||||||
|
retire+quarantine both delete the tarball when they move the
|
||||||
|
episode out, and tar overwrites any prior ``.partial`` on
|
||||||
|
each pass. So a stray tarball means an external actor (or an
|
||||||
|
OS crash) broke the invariant — clean it up rather than
|
||||||
|
carrying dead bytes on disk forever."""
|
||||||
|
outbox = self.cfg.outbox_dir
|
||||||
|
if not outbox.exists():
|
||||||
|
return
|
||||||
|
for f in outbox.iterdir():
|
||||||
|
name = f.name
|
||||||
|
if name.endswith(".tar.zst"):
|
||||||
|
ep_id = name[: -len(".tar.zst")]
|
||||||
|
elif name.endswith(".tar.zst.partial"):
|
||||||
|
ep_id = name[: -len(".tar.zst.partial")]
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
if not (self.cfg.episodes_dir / ep_id).exists():
|
||||||
|
try:
|
||||||
|
f.unlink()
|
||||||
|
log.info("swept orphan tarball %s", name)
|
||||||
|
except OSError:
|
||||||
|
log.exception("failed to sweep orphan tarball %s", name)
|
||||||
|
|
||||||
def _read_episode_commit(self, ep_dir: Path) -> str | None:
|
def _read_episode_commit(self, ep_dir: Path) -> str | None:
|
||||||
"""Pull meta.json::code_version.commit so the shipper can send
|
"""Pull meta.json::code_version.commit so the shipper can send
|
||||||
it as X-Cis490-Code-Commit. Returns None if the file is
|
it as X-Cis490-Code-Commit. Returns None if the file is
|
||||||
|
|
|
||||||
|
|
@ -320,6 +320,38 @@ def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> Non
|
||||||
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
|
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_once_sweeps_orphaned_outbox_tarball(tmp_path: Path, receiver) -> None:
|
||||||
|
"""A tarball in outbox/ with no matching episode dir should get
|
||||||
|
cleaned up at the start of the next scan. The lifecycle invariant
|
||||||
|
is `outbox/<id>.tar.zst exists ⇒ episodes/<id>/ exists`; a
|
||||||
|
violation means external interference (operator rm-ed the
|
||||||
|
episode, OS crash, leftover from older buggy code) and we'd
|
||||||
|
otherwise carry dead bytes forever."""
|
||||||
|
url, _ = receiver
|
||||||
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
||||||
|
|
||||||
|
# Stage an orphan: a tarball in outbox/ with no corresponding
|
||||||
|
# episodes/01ORPHAN/ directory.
|
||||||
|
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
orphan = cfg.outbox_dir / "01ORPHAN.tar.zst"
|
||||||
|
orphan.write_bytes(b"\x28\xb5\x2f\xfd") # zstd magic, not a real tarball
|
||||||
|
|
||||||
|
# Also a partial — same orphan rule applies.
|
||||||
|
partial = cfg.outbox_dir / "01PARTIAL.tar.zst.partial"
|
||||||
|
partial.write_bytes(b"x")
|
||||||
|
|
||||||
|
# And a non-orphan: tarball backed by an actual episode dir.
|
||||||
|
ep = _make_episode(cfg, "01LIVE")
|
||||||
|
|
||||||
|
queue.run_once()
|
||||||
|
|
||||||
|
assert not orphan.exists(), "orphan tarball must be swept"
|
||||||
|
assert not partial.exists(), "orphan partial must be swept"
|
||||||
|
# 01LIVE got shipped+retired in the same pass; both its tarball
|
||||||
|
# and its episode dir are gone (moved to shipped/).
|
||||||
|
assert (cfg.shipped_dir / "01LIVE").exists()
|
||||||
|
|
||||||
|
|
||||||
def test_run_once_quarantines_fatal_episode(tmp_path: Path) -> None:
|
def test_run_once_quarantines_fatal_episode(tmp_path: Path) -> None:
|
||||||
"""A 4xx-other-than-409 (e.g. 400 missing-commit) means re-shipping
|
"""A 4xx-other-than-409 (e.g. 400 missing-commit) means re-shipping
|
||||||
won't succeed. The shipper must move the episode out of the live
|
won't succeed. The shipper must move the episode out of the live
|
||||||
|
|
|
||||||
|
|
@ -248,6 +248,52 @@ def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_pat
|
||||||
assert "missing" in r.json()["error"].lower()
|
assert "missing" in r.json()["error"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
def test_forgejo_falls_back_to_local_git_when_unreachable(repo: Path) -> None:
|
||||||
|
"""If forgejo is unreachable AND a local repo path is configured,
|
||||||
|
the gate must fall back to git so the receiver doesn't reject
|
||||||
|
every PUT during a forgejo blip (e.g. the receiver and forgejo
|
||||||
|
restarting together on the same Pi). Without the fallback, the
|
||||||
|
cache stays empty and `check()` returns not-in-window for
|
||||||
|
everything — breaking all lab hosts simultaneously even though
|
||||||
|
their commits are perfectly valid."""
|
||||||
|
# Pick a port nothing's bound to. urllib will fail-fast.
|
||||||
|
import socket as _s
|
||||||
|
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
|
||||||
|
sock.bind(("127.0.0.1", 0))
|
||||||
|
dead_port = sock.getsockname()[1]
|
||||||
|
g = VersionGate(
|
||||||
|
repo_path=repo,
|
||||||
|
forgejo_url=f"http://127.0.0.1:{dead_port}",
|
||||||
|
repo_owner="spectral", repo_name="CIS490", branch="main",
|
||||||
|
window=50, cache_ttl_s=0,
|
||||||
|
)
|
||||||
|
head = _commits(repo)[0]
|
||||||
|
# Forgejo is unreachable, but the local repo has the commit.
|
||||||
|
# check() must still return ok.
|
||||||
|
ok, reason = g.check(head)
|
||||||
|
assert ok, f"fallback didn't trigger: reason={reason}, head={head}"
|
||||||
|
assert g.valid_count() >= 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_forgejo_no_fallback_without_local_repo(repo: Path) -> None:
|
||||||
|
"""Without a local repo configured, a forgejo blip is fatal — the
|
||||||
|
gate has nothing to fall back to and returns not-in-window. This
|
||||||
|
confirms the fallback is opt-in via repo_path, not a free side
|
||||||
|
effect of the forgejo backend."""
|
||||||
|
import socket as _s
|
||||||
|
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
|
||||||
|
sock.bind(("127.0.0.1", 0))
|
||||||
|
dead_port = sock.getsockname()[1]
|
||||||
|
g = VersionGate(
|
||||||
|
forgejo_url=f"http://127.0.0.1:{dead_port}",
|
||||||
|
repo_owner="spectral", repo_name="CIS490", branch="main",
|
||||||
|
window=50, cache_ttl_s=0,
|
||||||
|
)
|
||||||
|
ok, reason = g.check("a" * 40)
|
||||||
|
assert not ok and reason == "not-in-window"
|
||||||
|
assert g.valid_count() == 0
|
||||||
|
|
||||||
|
|
||||||
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
|
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
|
||||||
"""If the maintainer's clone disappears (or git fails), the gate
|
"""If the maintainer's clone disappears (or git fails), the gate
|
||||||
keeps its last-known allow-list — better than locking out every
|
keeps its last-known allow-list — better than locking out every
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue