from __future__ import annotations import hashlib import json from pathlib import Path import httpx import pytest from receiver.app import make_app from receiver.store import EpisodeStore def _sha(b: bytes) -> str: return hashlib.sha256(b).hexdigest() @pytest.fixture def store(tmp_path: Path) -> EpisodeStore: return EpisodeStore( store_root=tmp_path / "episodes", incoming_root=tmp_path / "incoming", index_path=tmp_path / "index.jsonl", ) @pytest.fixture async def client(store: EpisodeStore): app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token=None) transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: yield c async def test_health(client: httpx.AsyncClient) -> None: r = await client.get("/v1/health") assert r.status_code == 200 assert r.json() == {"status": "ok"} async def test_put_new_episode(client: httpx.AsyncClient, store: EpisodeStore) -> None: body = b"hello episode bytes" sha = _sha(body) r = await client.put( "/v1/episodes/lab1/01HW.tar.zst", content=body, headers={"X-Content-SHA256": sha}, ) assert r.status_code == 201, r.text assert r.json()["sha256"] == sha assert r.json()["size_bytes"] == len(body) final = store.final_path("lab1", "01HW") assert final.exists() assert final.read_bytes() == body rows = [json.loads(l) for l in store.index_path.read_text().splitlines()] assert len(rows) == 1 assert rows[0]["host_id"] == "lab1" assert rows[0]["episode_id"] == "01HW" assert rows[0]["sha256"] == sha assert rows[0]["size_bytes"] == len(body) assert rows[0]["schema_version"] == 1 async def test_put_with_explicit_schema_version(client: httpx.AsyncClient, store: EpisodeStore) -> None: body = b"v2 body" sha = _sha(body) r = await client.put( "/v1/episodes/lab1/v2.tar.zst", content=body, headers={"X-Content-SHA256": sha, "X-Schema-Version": "2"}, ) assert r.status_code == 201 rows = [json.loads(l) for l in store.index_path.read_text().splitlines()] assert rows[0]["schema_version"] == 2 async def test_put_idempotent_replay(client: httpx.AsyncClient, store: EpisodeStore) -> None: body = b"abc" sha = _sha(body) headers = {"X-Content-SHA256": sha} r1 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers) assert r1.status_code == 201 r2 = await client.put("/v1/episodes/lab1/x.tar.zst", content=body, headers=headers) assert r2.status_code == 200 assert r2.json()["status"] == "already-present" # Index gets only one row, not two. assert len(store.index_path.read_text().splitlines()) == 1 async def test_put_conflict_different_body(client: httpx.AsyncClient, store: EpisodeStore) -> None: b1, b2 = b"original", b"different" r1 = await client.put( "/v1/episodes/lab1/x.tar.zst", content=b1, headers={"X-Content-SHA256": _sha(b1)}, ) assert r1.status_code == 201 r2 = await client.put( "/v1/episodes/lab1/x.tar.zst", content=b2, headers={"X-Content-SHA256": _sha(b2)}, ) assert r2.status_code == 409 assert r2.json()["status"] == "conflict" # Original survives. assert store.final_path("lab1", "x").read_bytes() == b1 async def test_put_sha_mismatch_cleans_up(client: httpx.AsyncClient, store: EpisodeStore) -> None: bad_sha = "0" * 64 r = await client.put( "/v1/episodes/lab1/x.tar.zst", content=b"abc", headers={"X-Content-SHA256": bad_sha}, ) assert r.status_code == 400 assert r.json()["status"] == "sha-mismatch" assert not store.final_path("lab1", "x").exists() # No incoming partial left behind. incoming = store.incoming_root / "lab1" assert not list(incoming.glob("*.partial")) # No index entry written. assert store.index_path.read_text() == "" async def test_put_missing_sha_header(client: httpx.AsyncClient) -> None: r = await client.put("/v1/episodes/lab1/x.tar.zst", content=b"abc") assert r.status_code == 400 async def test_put_short_sha_header(client: httpx.AsyncClient) -> None: r = await client.put( "/v1/episodes/lab1/x.tar.zst", content=b"abc", headers={"X-Content-SHA256": "short"}, ) assert r.status_code == 400 async def test_put_bad_host_id(client: httpx.AsyncClient) -> None: r = await client.put( "/v1/episodes/bad host/x.tar.zst", content=b"x", headers={"X-Content-SHA256": _sha(b"x")}, ) # Starlette will route-mismatch on the space — accept either 404 or 400. assert r.status_code in (400, 404) async def test_put_bad_episode_id(client: httpx.AsyncClient) -> None: r = await client.put( "/v1/episodes/lab1/bad@id.tar.zst", content=b"x", headers={"X-Content-SHA256": _sha(b"x")}, ) assert r.status_code == 400 async def test_put_bad_suffix(client: httpx.AsyncClient) -> None: r = await client.put( "/v1/episodes/lab1/x.txt", content=b"x", headers={"X-Content-SHA256": _sha(b"x")}, ) assert r.status_code == 400 async def test_put_too_large_via_content_length(store: EpisodeStore) -> None: app = make_app(store=store, max_episode_bytes=10, bearer_token=None) transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: body = b"x" * 100 r = await c.put( "/v1/episodes/lab1/x.tar.zst", content=body, headers={"X-Content-SHA256": _sha(body)}, ) assert r.status_code == 413 async def test_bearer_token_enforcement(store: EpisodeStore) -> None: app = make_app(store=store, max_episode_bytes=1_000_000, bearer_token="s3cret") transport = httpx.ASGITransport(app=app) async with httpx.AsyncClient(transport=transport, base_url="http://test") as c: body = b"x" sha = _sha(body) # No header → 401. r = await c.put( "/v1/episodes/lab1/x.tar.zst", content=body, headers={"X-Content-SHA256": sha}, ) assert r.status_code == 401 # Wrong token → 401. r = await c.put( "/v1/episodes/lab1/x.tar.zst", content=body, headers={"X-Content-SHA256": sha, "Authorization": "Bearer wrong"}, ) assert r.status_code == 401 # Right token → 201. r = await c.put( "/v1/episodes/lab1/x.tar.zst", content=body, headers={"X-Content-SHA256": sha, "Authorization": "Bearer s3cret"}, ) assert r.status_code == 201