Implements docs/transport.md as a small Starlette app. The receiver streams
episode tarballs to disk, verifies sha256 against an X-Content-SHA256 header,
atomically renames into the store on success, and appends one row to a flat
index.jsonl. No DB. Idempotent re-PUTs return 200; conflicting bodies return
409. Optional bearer-token auth (mTLS terminates at Caddy in prod).
receiver/
store.py EpisodeStore: sha-verifying streaming ingest, atomic rename,
append-only index. No HTTP.
app.py make_app(): Starlette routes + bearer guard.
config.py ReceiverConfig.load(): TOML parser.
__main__.py uvicorn entrypoint, reads --config TOML.
tests/test_receiver.py — 13 tests via httpx.ASGITransport. Covers: 201 new,
200 idempotent replay, 409 conflict, 400 sha mismatch + cleanup, 400 missing/
short header, 400 bad id, 400 bad suffix, 413 too large, 401 bearer enforcement,
schema-version pass-through.
etc/cis490-receiver.service — systemd unit with hardening flags.
etc/receiver.toml.example — config template matching docs/deploy.md.
End-to-end smoke-tested with curl: 201 → 200 → 409 path verified, file
on disk, single index row.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
209 lines
6.7 KiB
Python
209 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from receiver.app import make_app
|
|
from receiver.store import EpisodeStore
|
|
|
|
|
|
def _sha(b: bytes) -> str:
|
|
return hashlib.sha256(b).hexdigest()
|
|
|
|
|
|
@pytest.fixture
|
|
def store(tmp_path: Path) -> EpisodeStore:
|
|
return EpisodeStore(
|
|
store_root=tmp_path / "episodes",
|
|
incoming_root=tmp_path / "incoming",
|
|
index_path=tmp_path / "index.jsonl",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(store: EpisodeStore):
|
|
app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token=None)
|
|
transport = httpx.ASGITransport(app=app)
|
|
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
|
|
yield c
|
|
|
|
|
|
async def test_health(client: httpx.AsyncClient) -> None:
|
|
r = await client.get("/v1/health")
|
|
assert r.status_code == 200
|
|
assert r.json() == {"status": "ok"}
|
|
|
|
|
|
async def test_put_new_episode(client: httpx.AsyncClient, store: EpisodeStore) -> None:
|
|
body = b"hello episode bytes"
|
|
sha = _sha(body)
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/01HW.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": sha},
|
|
)
|
|
assert r.status_code == 201, r.text
|
|
assert r.json()["sha256"] == sha
|
|
assert r.json()["size_bytes"] == len(body)
|
|
|
|
final = store.final_path("lab1", "01HW")
|
|
assert final.exists()
|
|
assert final.read_bytes() == body
|
|
|
|
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
|
|
assert len(rows) == 1
|
|
assert rows[0]["host_id"] == "lab1"
|
|
assert rows[0]["episode_id"] == "01HW"
|
|
assert rows[0]["sha256"] == sha
|
|
assert rows[0]["size_bytes"] == len(body)
|
|
assert rows[0]["schema_version"] == 1
|
|
|
|
|
|
async def test_put_with_explicit_schema_version(client: httpx.AsyncClient, store: EpisodeStore) -> None:
|
|
body = b"v2 body"
|
|
sha = _sha(body)
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/v2.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": sha, "X-Schema-Version": "2"},
|
|
)
|
|
assert r.status_code == 201
|
|
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
|
|
assert rows[0]["schema_version"] == 2
|
|
|
|
|
|
async def test_put_idempotent_replay(client: httpx.AsyncClient, store: EpisodeStore) -> None:
|
|
body = b"abc"
|
|
sha = _sha(body)
|
|
headers = {"X-Content-SHA256": sha}
|
|
r1 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers)
|
|
assert r1.status_code == 201
|
|
r2 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers)
|
|
assert r2.status_code == 200
|
|
assert r2.json()["status"] == "already-present"
|
|
# Index gets only one row, not two.
|
|
assert len(store.index_path.read_text().splitlines()) == 1
|
|
|
|
|
|
async def test_put_conflict_different_body(client: httpx.AsyncClient, store: EpisodeStore) -> None:
|
|
b1, b2 = b"original", b"different"
|
|
r1 = await client.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=b1,
|
|
headers={"X-Content-SHA256": _sha(b1)},
|
|
)
|
|
assert r1.status_code == 201
|
|
r2 = await client.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=b2,
|
|
headers={"X-Content-SHA256": _sha(b2)},
|
|
)
|
|
assert r2.status_code == 409
|
|
assert r2.json()["status"] == "conflict"
|
|
# Original survives.
|
|
assert store.final_path("lab1", "x").read_bytes() == b1
|
|
|
|
|
|
async def test_put_sha_mismatch_cleans_up(client: httpx.AsyncClient, store: EpisodeStore) -> None:
|
|
bad_sha = "0" * 64
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=b"abc",
|
|
headers={"X-Content-SHA256": bad_sha},
|
|
)
|
|
assert r.status_code == 400
|
|
assert r.json()["status"] == "sha-mismatch"
|
|
assert not store.final_path("lab1", "x").exists()
|
|
# No incoming partial left behind.
|
|
incoming = store.incoming_root / "lab1"
|
|
assert not list(incoming.glob("*.partial"))
|
|
# No index entry written.
|
|
assert store.index_path.read_text() == ""
|
|
|
|
|
|
async def test_put_missing_sha_header(client: httpx.AsyncClient) -> None:
|
|
r = await client.put("/v1/episodes/lab1/x.tar.zst", content=b"abc")
|
|
assert r.status_code == 400
|
|
|
|
|
|
async def test_put_short_sha_header(client: httpx.AsyncClient) -> None:
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=b"abc",
|
|
headers={"X-Content-SHA256": "short"},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
async def test_put_bad_host_id(client: httpx.AsyncClient) -> None:
|
|
r = await client.put(
|
|
"/v1/episodes/bad host/x.tar.zst",
|
|
content=b"x",
|
|
headers={"X-Content-SHA256": _sha(b"x")},
|
|
)
|
|
# Starlette will route-mismatch on the space — accept either 404 or 400.
|
|
assert r.status_code in (400, 404)
|
|
|
|
|
|
async def test_put_bad_episode_id(client: httpx.AsyncClient) -> None:
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/bad@id.tar.zst",
|
|
content=b"x",
|
|
headers={"X-Content-SHA256": _sha(b"x")},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
async def test_put_bad_suffix(client: httpx.AsyncClient) -> None:
|
|
r = await client.put(
|
|
"/v1/episodes/lab1/x.txt",
|
|
content=b"x",
|
|
headers={"X-Content-SHA256": _sha(b"x")},
|
|
)
|
|
assert r.status_code == 400
|
|
|
|
|
|
async def test_put_too_large_via_content_length(store: EpisodeStore) -> None:
|
|
app = make_app(store=store, max_episode_bytes=10, bearer_token=None)
|
|
transport = httpx.ASGITransport(app=app)
|
|
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
|
|
body = b"x" * 100
|
|
r = await c.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": _sha(body)},
|
|
)
|
|
assert r.status_code == 413
|
|
|
|
|
|
async def test_bearer_token_enforcement(store: EpisodeStore) -> None:
|
|
app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token="s3cret")
|
|
transport = httpx.ASGITransport(app=app)
|
|
async with httpx.AsyncClient(transport=transport, base_url="http://test") as c:
|
|
body = b"x"
|
|
sha = _sha(body)
|
|
# No header → 401.
|
|
r = await c.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": sha},
|
|
)
|
|
assert r.status_code == 401
|
|
# Wrong token → 401.
|
|
r = await c.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": sha, "Authorization": "Bearer wrong"},
|
|
)
|
|
assert r.status_code == 401
|
|
# Right token → 201.
|
|
r = await c.put(
|
|
"/v1/episodes/lab1/x.tar.zst",
|
|
content=body,
|
|
headers={"X-Content-SHA256": sha, "Authorization": "Bearer s3cret"},
|
|
)
|
|
assert r.status_code == 201
|