Stops out-of-date lab hosts from polluting the dataset with episodes
generated by buggy code. The valid-commits set mirrors the maintainer's
working clone on the Pi automatically — when the maintainer pulls or
pushes a new commit, the receiver picks it up within the 5-second
cache TTL with no service restart.
Receiver changes:
- receiver/version_gate.py (new): VersionGate(repo_path, window).
Each check() consults a frozenset of the last `window` commit
hashes from `git -C <repo> log --format=%H -n <window>`, refreshed
every 5s under a lock. Resilient to transient git failure (keeps
prior cache so a flaky `git` doesn't lock out every shipper).
- receiver/app.py: PUT extracts X-Cis490-Code-Commit; gate.check()
before ingest. Rejects with:
400 + remediation if header missing or malformed
412 + remediation + your_commit + head_commit if not in window
Remediation block is verbatim copy-pasteable into the lab-host
shell:
cd /opt/cis490 && sudo -u cis490 git pull origin main
sudo /opt/cis490/scripts/install-lab-host.sh
sudo systemctl restart cis490-orchestrator
- receiver/store.py: ingest_stream takes commit kwarg, stamps it on
the index.jsonl row (new optional field). Backfilled rows from
index_backfill.py also pull commit out of meta.json.
- receiver/config.py + etc/receiver.toml.example: new [version_gate]
section. enabled=true, repo_path=/home/max/cis490, window=100 by
default. Enabled toggle exists for emergency disable-and-collect.
Shipper changes:
- shipper/transport.py: ship_tarball() takes commit kwarg, sends
X-Cis490-Code-Commit header. 412 maps to status='fatal' so the
queue doesn't infinite-retry — operator must pull and reinstall
before the next ship will succeed.
- shipper/queue.py: reads meta.json::code_version.commit per
episode, passes through. On 412, logs the receiver's full
remediation block at ERROR level so journalctl on the lab host
shows exactly what to run.
Tests: 9 in test_version_gate (including 2 end-to-end via
starlette.testclient), 2 cover the boundary where new commits land
mid-cache and where missing-repo gracefully keeps prior cache.
157/157 total.
Index schema: existing rows stay valid (commit field is optional
on read). New rows from receiver-direct AND from index_backfill.py
include commit.
159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
"""``cis490-index-backfill`` — rebuild missing index.jsonl rows from
|
|
tarballs already on disk.
|
|
|
|
Use case: a stretch where the receiver wrote tarballs to
|
|
``episodes/<host>/`` but failed to append the matching index row
|
|
(permissions, disk full, crash mid-write). The shipper retries see
|
|
``already-present`` and never re-PUT, so the gap is permanent until
|
|
something on the receiver-side fills it.
|
|
|
|
This tool walks ``episodes/<host>/<id>.tar.zst`` and, for any episode
|
|
not already in the index, computes sha256 + size and appends a row
|
|
matching the receiver's schema. Existing rows are left alone.
|
|
|
|
Run on the receiver host as the same user the receiver runs under
|
|
(``cis490``) so the appended rows match in ownership and the live
|
|
receiver can keep writing afterward:
|
|
|
|
sudo -u cis490 /opt/cis490/.venv/bin/python \\
|
|
/opt/cis490/tools/index_backfill.py \\
|
|
--episodes-root /var/lib/cis490/episodes \\
|
|
--index /var/lib/cis490/index.jsonl
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
|
|
SCHEMA_VERSION = 1
|
|
|
|
|
|
def _sha256_of(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def _commit_from_tarball(tar_path: Path) -> str | None:
|
|
"""Extract meta.json::code_version.commit from a tar.zst without
|
|
leaving anything on disk. Returns None on any failure — callers
|
|
write the row without a commit field."""
|
|
import io as _io, subprocess as _sp, tarfile as _tar
|
|
try:
|
|
z = _sp.check_output(
|
|
["zstd", "-q", "-d", "--stdout", str(tar_path)],
|
|
stderr=_sp.DEVNULL,
|
|
)
|
|
except (_sp.CalledProcessError, FileNotFoundError, OSError):
|
|
return None
|
|
try:
|
|
with _tar.open(fileobj=_io.BytesIO(z)) as t:
|
|
for m in t.getmembers():
|
|
if m.name.endswith("meta.json") and m.isfile():
|
|
f = t.extractfile(m)
|
|
if f is None:
|
|
return None
|
|
meta = json.loads(f.read().decode("utf-8"))
|
|
cv = meta.get("code_version") or {}
|
|
c = cv.get("commit")
|
|
if isinstance(c, str) and len(c) == 40:
|
|
return c.lower()
|
|
return None
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _existing_episode_ids(index_path: Path) -> set[str]:
|
|
if not index_path.exists():
|
|
return set()
|
|
seen: set[str] = set()
|
|
for line in index_path.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
ep = row.get("episode_id")
|
|
if isinstance(ep, str):
|
|
seen.add(ep)
|
|
return seen
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(prog="cis490-index-backfill")
|
|
p.add_argument("--episodes-root", type=Path,
|
|
default=Path("/var/lib/cis490/episodes"))
|
|
p.add_argument("--index", type=Path,
|
|
default=Path("/var/lib/cis490/index.jsonl"))
|
|
p.add_argument("--host", help="Only backfill this host_id")
|
|
p.add_argument("--dry-run", action="store_true",
|
|
help="Print what would be appended; don't write")
|
|
args = p.parse_args(argv)
|
|
|
|
if not args.episodes_root.exists():
|
|
print(f"no episodes dir at {args.episodes_root}", file=sys.stderr)
|
|
return 2
|
|
|
|
seen = _existing_episode_ids(args.index)
|
|
rows_to_write: list[dict] = []
|
|
scanned = 0
|
|
for host_dir in sorted(args.episodes_root.iterdir()):
|
|
if not host_dir.is_dir():
|
|
continue
|
|
if args.host and host_dir.name != args.host:
|
|
continue
|
|
for tar in sorted(host_dir.glob("*.tar.zst")):
|
|
scanned += 1
|
|
episode_id = tar.stem.removesuffix(".tar")
|
|
if episode_id in seen:
|
|
continue
|
|
sha = _sha256_of(tar)
|
|
size = tar.stat().st_size
|
|
row: dict = {
|
|
"received_at_wall": datetime.now(timezone.utc).isoformat(),
|
|
"host_id": host_dir.name,
|
|
"episode_id": episode_id,
|
|
"sha256": sha,
|
|
"size_bytes": size,
|
|
"schema_version": SCHEMA_VERSION,
|
|
"backfilled": True,
|
|
}
|
|
commit = _commit_from_tarball(tar)
|
|
if commit:
|
|
row["commit"] = commit
|
|
rows_to_write.append(row)
|
|
|
|
print(f"scanned: {scanned} already-indexed: {scanned - len(rows_to_write)} "
|
|
f"to-backfill: {len(rows_to_write)}")
|
|
if not rows_to_write:
|
|
return 0
|
|
if args.dry_run:
|
|
for r in rows_to_write[:5]:
|
|
print(json.dumps(r, sort_keys=True))
|
|
if len(rows_to_write) > 5:
|
|
print(f"... ({len(rows_to_write) - 5} more)")
|
|
return 0
|
|
|
|
# Append each row exactly the way the receiver does (single-line
|
|
# write, append-mode open). This keeps writes atomic on POSIX for
|
|
# rows < PIPE_BUF and never replaces the file, so ownership is
|
|
# preserved automatically.
|
|
with args.index.open("a") as f:
|
|
for row in rows_to_write:
|
|
f.write(json.dumps(row, sort_keys=True) + "\n")
|
|
print(f"backfilled {len(rows_to_write)} rows into {args.index}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|