CIS490/tests/test_receiver.py
Maximus Gorog 83e111961d Add receiver: PUT /v1/episodes ingest with sha256 verify and idempotency
Implements docs/transport.md as a small Starlette app. The receiver streams
episode tarballs to disk, verifies sha256 against an X-Content-SHA256 header,
atomically renames into the store on success, and appends one row to a flat
index.jsonl. No DB. Idempotent re-PUTs return 200; conflicting bodies return
409. Optional bearer-token auth (mTLS terminates at Caddy in prod).

receiver/
  store.py        EpisodeStore: sha-verifying streaming ingest, atomic rename,
                  append-only index. No HTTP.
  app.py          make_app(): Starlette routes + bearer guard.
  config.py       ReceiverConfig.load(): TOML parser.
  __main__.py     uvicorn entrypoint, reads --config TOML.

tests/test_receiver.py — 13 tests via httpx.ASGITransport. Covers: 201 new,
200 idempotent replay, 409 conflict, 400 sha mismatch + cleanup, 400 missing/
short header, 400 bad id, 400 bad suffix, 413 too large, 401 bearer enforcement,
schema-version pass-through.

etc/cis490-receiver.service — systemd unit with hardening flags.
etc/receiver.toml.example — config template matching docs/deploy.md.

End-to-end smoke-tested with curl: 201 → 200 → 409 path verified, file
on disk, single index row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:34:04 -06:00

209 lines
6.7 KiB
Python

from __future__ import annotations
import hashlib
import json
from pathlib import Path
import httpx
import pytest
from receiver.app import make_app
from receiver.store import EpisodeStore
def _sha(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
@pytest.fixture
def store(tmp_path: Path) -> EpisodeStore:
return EpisodeStore(
store_root=tmp_path / "episodes",
incoming_root=tmp_path / "incoming",
index_path=tmp_path / "index.jsonl",
)
@pytest.fixture
async def client(store: EpisodeStore):
app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
yield c
async def test_health(client: httpx.AsyncClient) -> None:
r = await client.get("/v1/health")
assert r.status_code == 200
assert r.json() == {"status": "ok"}
async def test_put_new_episode(client: httpx.AsyncClient, store: EpisodeStore) -> None:
body = b"hello episode bytes"
sha = _sha(body)
r = await client.put(
"/v1/episodes/lab1/01HW.tar.zst",
content=body,
headers={"X-Content-SHA256": sha},
)
assert r.status_code == 201, r.text
assert r.json()["sha256"] == sha
assert r.json()["size_bytes"] == len(body)
final = store.final_path("lab1", "01HW")
assert final.exists()
assert final.read_bytes() == body
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
assert len(rows) == 1
assert rows[0]["host_id"] == "lab1"
assert rows[0]["episode_id"] == "01HW"
assert rows[0]["sha256"] == sha
assert rows[0]["size_bytes"] == len(body)
assert rows[0]["schema_version"] == 1
async def test_put_with_explicit_schema_version(client: httpx.AsyncClient, store: EpisodeStore) -> None:
body = b"v2 body"
sha = _sha(body)
r = await client.put(
"/v1/episodes/lab1/v2.tar.zst",
content=body,
headers={"X-Content-SHA256": sha, "X-Schema-Version": "2"},
)
assert r.status_code == 201
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
assert rows[0]["schema_version"] == 2
async def test_put_idempotent_replay(client: httpx.AsyncClient, store: EpisodeStore) -> None:
body = b"abc"
sha = _sha(body)
headers = {"X-Content-SHA256": sha}
r1 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers)
assert r1.status_code == 201
r2 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers)
assert r2.status_code == 200
assert r2.json()["status"] == "already-present"
# Index gets only one row, not two.
assert len(store.index_path.read_text().splitlines()) == 1
async def test_put_conflict_different_body(client: httpx.AsyncClient, store: EpisodeStore) -> None:
b1, b2 = b"original", b"different"
r1 = await client.put(
"/v1/episodes/lab1/x.tar.zst",
content=b1,
headers={"X-Content-SHA256": _sha(b1)},
)
assert r1.status_code == 201
r2 = await client.put(
"/v1/episodes/lab1/x.tar.zst",
content=b2,
headers={"X-Content-SHA256": _sha(b2)},
)
assert r2.status_code == 409
assert r2.json()["status"] == "conflict"
# Original survives.
assert store.final_path("lab1", "x").read_bytes() == b1
async def test_put_sha_mismatch_cleans_up(client: httpx.AsyncClient, store: EpisodeStore) -> None:
bad_sha = "0" * 64
r = await client.put(
"/v1/episodes/lab1/x.tar.zst",
content=b"abc",
headers={"X-Content-SHA256": bad_sha},
)
assert r.status_code == 400
assert r.json()["status"] == "sha-mismatch"
assert not store.final_path("lab1", "x").exists()
# No incoming partial left behind.
incoming = store.incoming_root / "lab1"
assert not list(incoming.glob("*.partial"))
# No index entry written.
assert store.index_path.read_text() == ""
async def test_put_missing_sha_header(client: httpx.AsyncClient) -> None:
r = await client.put("/v1/episodes/lab1/x.tar.zst", content=b"abc")
assert r.status_code == 400
async def test_put_short_sha_header(client: httpx.AsyncClient) -> None:
r = await client.put(
"/v1/episodes/lab1/x.tar.zst",
content=b"abc",
headers={"X-Content-SHA256": "short"},
)
assert r.status_code == 400
async def test_put_bad_host_id(client: httpx.AsyncClient) -> None:
r = await client.put(
"/v1/episodes/bad host/x.tar.zst",
content=b"x",
headers={"X-Content-SHA256": _sha(b"x")},
)
# Starlette will route-mismatch on the space — accept either 404 or 400.
assert r.status_code in (400, 404)
async def test_put_bad_episode_id(client: httpx.AsyncClient) -> None:
r = await client.put(
"/v1/episodes/lab1/bad@id.tar.zst",
content=b"x",
headers={"X-Content-SHA256": _sha(b"x")},
)
assert r.status_code == 400
async def test_put_bad_suffix(client: httpx.AsyncClient) -> None:
r = await client.put(
"/v1/episodes/lab1/x.txt",
content=b"x",
headers={"X-Content-SHA256": _sha(b"x")},
)
assert r.status_code == 400
async def test_put_too_large_via_content_length(store: EpisodeStore) -> None:
app = make_app(store=store, max_episode_bytes=10, bearer_token=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
body = b"x" * 100
r = await c.put(
"/v1/episodes/lab1/x.tar.zst",
content=body,
headers={"X-Content-SHA256": _sha(body)},
)
assert r.status_code == 413
async def test_bearer_token_enforcement(store: EpisodeStore) -> None:
app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token="s3cret")
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
body = b"x"
sha = _sha(body)
# No header → 401.
r = await c.put(
"/v1/episodes/lab1/x.tar.zst",
content=body,
headers={"X-Content-SHA256": sha},
)
assert r.status_code == 401
# Wrong token → 401.
r = await c.put(
"/v1/episodes/lab1/x.tar.zst",
content=body,
headers={"X-Content-SHA256": sha, "Authorization": "Bearer wrong"},
)
assert r.status_code == 401
# Right token → 201.
r = await c.put(
"/v1/episodes/lab1/x.tar.zst",
content=body,
headers={"X-Content-SHA256": sha, "Authorization": "Bearer s3cret"},
)
assert r.status_code == 201