CIS490/receiver/store.py
max f8ad02b2d7 Receiver enforces X-Cis490-Code-Commit allow-list (live, auto-refreshed)
Stops out-of-date lab hosts from polluting the dataset with episodes
generated by buggy code. The valid-commits set mirrors the maintainer's
working clone on the Pi automatically — when the maintainer pulls or
pushes a new commit, the receiver picks it up within the 5-second
cache TTL with no service restart.

Receiver changes:

- receiver/version_gate.py (new): VersionGate(repo_path, window).
  Each check() consults a frozenset of the last `window` commit
  hashes from `git -C <repo> log --format=%H -n <window>`, refreshed
  every 5s under a lock. Resilient to transient git failure (keeps
  prior cache so a flaky `git` doesn't lock out every shipper).

- receiver/app.py: PUT extracts X-Cis490-Code-Commit; gate.check()
  before ingest. Rejects with:
    400 + remediation if header missing or malformed
    412 + remediation + your_commit + head_commit if not in window
  Remediation block is verbatim copy-pasteable into the lab-host
  shell:
    cd /opt/cis490 && sudo -u cis490 git pull origin main
    sudo /opt/cis490/scripts/install-lab-host.sh
    sudo systemctl restart cis490-orchestrator

- receiver/store.py: ingest_stream takes commit kwarg, stamps it on
  the index.jsonl row (new optional field). Backfilled rows from
  index_backfill.py also pull commit out of meta.json.

- receiver/config.py + etc/receiver.toml.example: new [version_gate]
  section. enabled=true, repo_path=/home/max/cis490, window=100 by
  default. Enabled toggle exists for emergency disable-and-collect.

Shipper changes:

- shipper/transport.py: ship_tarball() takes commit kwarg, sends
  X-Cis490-Code-Commit header. 412 maps to status='fatal' so the
  queue doesn't infinite-retry — operator must pull and reinstall
  before the next ship will succeed.

- shipper/queue.py: reads meta.json::code_version.commit per
  episode, passes through. On 412, logs the receiver's full
  remediation block at ERROR level so journalctl on the lab host
  shows exactly what to run.

Tests: 9 in test_version_gate (including 2 end-to-end via
starlette.testclient), 2 cover the boundary where new commits land
mid-cache and where missing-repo gracefully keeps prior cache.
157/157 total.

Index schema: existing rows stay valid (commit field is optional
on read). New rows from receiver-direct AND from index_backfill.py
include commit.
2026-05-01 01:38:50 -05:00

134 lines
4.4 KiB
Python

from __future__ import annotations
import hashlib
import json
import os
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import AsyncIterator
# ULID/UUID/host-id alphabet plus separators — strict on purpose.
_ID_RE = re.compile(r"^[A-Za-z0-9_.-]{1,64}$")
def is_valid_id(s: str) -> bool:
return bool(_ID_RE.match(s))
@dataclass(frozen=True)
class StoreResult:
status: str # "stored" | "already-present" | "conflict" | "sha-mismatch"
sha256: str | None
size_bytes: int | None
existing_sha256: str | None = None
class EpisodeStore:
def __init__(
self,
store_root: Path,
incoming_root: Path,
index_path: Path,
) -> None:
self.store_root = store_root
self.incoming_root = incoming_root
self.index_path = index_path
self.store_root.mkdir(parents=True, exist_ok=True)
self.incoming_root.mkdir(parents=True, exist_ok=True)
self.index_path.parent.mkdir(parents=True, exist_ok=True)
self.index_path.touch(exist_ok=True)
def final_path(self, host_id: str, episode_id: str) -> Path:
return self.store_root / host_id / f"{episode_id}.tar.zst"
def _hash_file(self, path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
async def ingest_stream(
self,
host_id: str,
episode_id: str,
expected_sha256: str,
schema_version: int,
body: AsyncIterator[bytes],
max_bytes: int,
commit: str | None = None,
) -> StoreResult:
final = self.final_path(host_id, episode_id)
if final.exists():
existing = self._hash_file(final)
# Drain the body so the client doesn't see a write error.
async for _ in body:
pass
if existing == expected_sha256:
return StoreResult(
status="already-present",
sha256=existing,
size_bytes=final.stat().st_size,
)
return StoreResult(
status="conflict",
sha256=None,
size_bytes=None,
existing_sha256=existing,
)
incoming_dir = self.incoming_root / host_id
incoming_dir.mkdir(parents=True, exist_ok=True)
partial = incoming_dir / f"{episode_id}.tar.zst.partial"
h = hashlib.sha256()
bytes_written = 0
try:
with partial.open("wb") as out:
async for chunk in body:
bytes_written += len(chunk)
if bytes_written > max_bytes:
partial.unlink(missing_ok=True)
return StoreResult(
status="too-large",
sha256=None,
size_bytes=bytes_written,
)
h.update(chunk)
out.write(chunk)
actual = h.hexdigest()
if actual != expected_sha256:
partial.unlink(missing_ok=True)
return StoreResult(
status="sha-mismatch",
sha256=actual,
size_bytes=bytes_written,
)
final.parent.mkdir(parents=True, exist_ok=True)
os.replace(partial, final)
row = {
"received_at_wall": datetime.now(timezone.utc).isoformat(),
"host_id": host_id,
"episode_id": episode_id,
"sha256": actual,
"size_bytes": bytes_written,
"schema_version": schema_version,
}
if commit:
row["commit"] = commit
self._append_index(row)
return StoreResult(status="stored", sha256=actual, size_bytes=bytes_written)
except BaseException:
partial.unlink(missing_ok=True)
raise
def _append_index(self, row: dict) -> None:
line = json.dumps(row, sort_keys=True) + "\n"
# Single write of one line is atomic on POSIX for sizes < PIPE_BUF.
# Index rows are well under that.
with self.index_path.open("a") as f:
f.write(line)