First-boot bring-up enables cis490-shipper before the Pi has issued the mTLS leaf, so ssl.create_default_context(cafile=...) raised FileNotFoundError out of __init__ and systemd crash-looped the unit every RestartSec=5. Now the transport pre-flights the configured ca_bundle / client_cert / client_key paths, raises a recoverable _CertNotReadyError, and ping/ship_tarball retry the build on each request — daemon self-heals once the cert lands without a restart. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
349 lines
11 KiB
Python
349 lines
11 KiB
Python
"""End-to-end shipper tests.
|
|
|
|
These run a real Uvicorn server bound to 127.0.0.1 on a free port,
|
|
hosting the actual receiver Starlette app over an EpisodeStore on a
|
|
temp dir. The shipper then talks to that server with its real
|
|
`httpx.Client` — same code path as production. This catches things
|
|
the receiver-side ASGI tests can't (HTTP framing, header handling,
|
|
sync httpx behaviour, content-length quirks).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import socket
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import pytest
|
|
import uvicorn
|
|
|
|
from receiver.app import make_app
|
|
from receiver.store import EpisodeStore
|
|
from shipper.config import ReceiverEndpoint, ShipperConfig
|
|
from shipper.queue import ShipperQueue
|
|
from shipper.transport import ShipperTransport
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Live-receiver fixture
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _free_port() -> int:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("127.0.0.1", 0))
|
|
return s.getsockname()[1]
|
|
|
|
|
|
class _ServerThread(threading.Thread):
|
|
def __init__(self, app, port: int) -> None:
|
|
super().__init__(daemon=True)
|
|
cfg = uvicorn.Config(
|
|
app,
|
|
host="127.0.0.1",
|
|
port=port,
|
|
log_level="error",
|
|
lifespan="off",
|
|
access_log=False,
|
|
)
|
|
self.server = uvicorn.Server(cfg)
|
|
|
|
def run(self) -> None:
|
|
self.server.run()
|
|
|
|
def stop(self) -> None:
|
|
self.server.should_exit = True
|
|
|
|
|
|
def _wait_for_port(port: int, timeout_s: float = 5.0) -> None:
|
|
deadline = time.monotonic() + timeout_s
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
with httpx.Client(timeout=0.5) as c:
|
|
r = c.get(f"http://127.0.0.1:{port}/v1/health")
|
|
if r.status_code == 200:
|
|
return
|
|
except httpx.HTTPError:
|
|
pass
|
|
time.sleep(0.05)
|
|
raise TimeoutError(f"receiver on 127.0.0.1:{port} did not come up")
|
|
|
|
|
|
@pytest.fixture
|
|
def store(tmp_path: Path) -> EpisodeStore:
|
|
return EpisodeStore(
|
|
store_root=tmp_path / "rcv-episodes",
|
|
incoming_root=tmp_path / "rcv-incoming",
|
|
index_path=tmp_path / "rcv-index.jsonl",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def receiver(store: EpisodeStore):
|
|
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token=None)
|
|
port = _free_port()
|
|
server = _ServerThread(app, port)
|
|
server.start()
|
|
try:
|
|
_wait_for_port(port)
|
|
yield f"http://127.0.0.1:{port}", store
|
|
finally:
|
|
server.stop()
|
|
server.join(timeout=2)
|
|
|
|
|
|
@pytest.fixture
|
|
def receiver_with_bearer(store: EpisodeStore):
|
|
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token="s3cret")
|
|
port = _free_port()
|
|
server = _ServerThread(app, port)
|
|
server.start()
|
|
try:
|
|
_wait_for_port(port)
|
|
yield f"http://127.0.0.1:{port}", store
|
|
finally:
|
|
server.stop()
|
|
server.join(timeout=2)
|
|
|
|
|
|
def _make_shipper(
|
|
tmp_path: Path,
|
|
receiver_url: str,
|
|
*,
|
|
host_id: str = "lab1",
|
|
bearer: str | None = None,
|
|
) -> tuple[ShipperConfig, ShipperTransport, ShipperQueue]:
|
|
data_root = tmp_path / "lab-data"
|
|
cfg = ShipperConfig(
|
|
host_id=host_id,
|
|
data_root=data_root,
|
|
receiver=ReceiverEndpoint(url=receiver_url, bearer_token=bearer),
|
|
scan_interval_s=0.05,
|
|
)
|
|
transport = ShipperTransport(cfg)
|
|
queue = ShipperQueue(cfg, transport)
|
|
return cfg, transport, queue
|
|
|
|
|
|
def _make_episode(cfg: ShipperConfig, episode_id: str, *, content: bytes = b"data") -> Path:
|
|
ep = cfg.episodes_dir / episode_id
|
|
ep.mkdir(parents=True, exist_ok=True)
|
|
(ep / "meta.json").write_bytes(content)
|
|
(ep / "events.jsonl").write_text("{}\n")
|
|
(ep / "labels.jsonl").write_text("{}\n")
|
|
(ep / "telemetry-proc.jsonl").write_text("{}\n")
|
|
(ep / "done.marker").touch()
|
|
return ep
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_ping_returns_ok_against_running_receiver(tmp_path: Path, receiver) -> None:
|
|
url, _ = receiver
|
|
_, transport, _ = _make_shipper(tmp_path, url)
|
|
res = transport.ping()
|
|
assert res.ok is True
|
|
assert res.status_code == 200
|
|
assert res.body is not None
|
|
assert res.body["ok"] is True
|
|
assert res.body["host_id"] == "lab1"
|
|
assert res.body["schema_version"] == 1
|
|
|
|
|
|
def test_ping_writes_nothing_to_index(tmp_path: Path, receiver) -> None:
|
|
url, store = receiver
|
|
_, transport, _ = _make_shipper(tmp_path, url)
|
|
transport.ping()
|
|
transport.ping()
|
|
transport.ping()
|
|
assert store.index_path.read_text() == ""
|
|
|
|
|
|
def test_ping_fails_with_wrong_bearer(tmp_path: Path, receiver_with_bearer) -> None:
|
|
url, _ = receiver_with_bearer
|
|
_, transport, _ = _make_shipper(tmp_path, url, bearer="WRONG")
|
|
res = transport.ping()
|
|
assert res.ok is False
|
|
assert res.status_code == 401
|
|
|
|
|
|
def test_ping_succeeds_with_right_bearer(tmp_path: Path, receiver_with_bearer) -> None:
|
|
url, _ = receiver_with_bearer
|
|
_, transport, _ = _make_shipper(tmp_path, url, bearer="s3cret")
|
|
res = transport.ping()
|
|
assert res.ok is True
|
|
assert res.status_code == 200
|
|
|
|
|
|
def test_ping_fails_when_receiver_unreachable(tmp_path: Path) -> None:
|
|
# Pick a free port and don't bind it — connect must fail.
|
|
port = _free_port()
|
|
_, transport, _ = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
|
|
res = transport.ping()
|
|
assert res.ok is False
|
|
assert res.status_code == 0
|
|
assert res.error is not None
|
|
|
|
|
|
def test_transport_defers_when_ca_bundle_missing(tmp_path: Path) -> None:
|
|
"""Issue #11: first-boot bring-up enables the shipper before the Pi
|
|
has issued the mTLS leaf. Construction must not crash; ping/ship
|
|
should return a transient error until the cert lands."""
|
|
missing_ca = tmp_path / "not-yet" / "wg-ca.pem"
|
|
cfg = ShipperConfig(
|
|
host_id="lab1",
|
|
data_root=tmp_path / "lab-data",
|
|
receiver=ReceiverEndpoint(
|
|
url="https://collector.wg",
|
|
ca_bundle=missing_ca,
|
|
),
|
|
)
|
|
# Construction MUST succeed even though the CA bundle is missing —
|
|
# this is the bug fix: previously raised FileNotFoundError out of
|
|
# ssl.create_default_context, crashing the systemd unit.
|
|
transport = ShipperTransport(cfg)
|
|
res = transport.ping()
|
|
assert res.ok is False
|
|
assert res.error is not None and "mTLS material" in res.error
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tar + ship
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_run_once_ships_one_done_episode(tmp_path: Path, receiver) -> None:
|
|
url, store = receiver
|
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
|
_make_episode(cfg, "01EPISODE")
|
|
|
|
result = queue.run_once()
|
|
assert result.scanned == 1
|
|
assert result.shipped == 1
|
|
assert result.transient_failures == 0
|
|
|
|
# Episode dir moved to shipped/.
|
|
assert not (cfg.episodes_dir / "01EPISODE").exists()
|
|
assert (cfg.shipped_dir / "01EPISODE").exists()
|
|
|
|
# Outbox tarball cleaned up.
|
|
assert list(cfg.outbox_dir.iterdir()) == []
|
|
|
|
# Receiver stored it and indexed it.
|
|
assert store.final_path("lab1", "01EPISODE").exists()
|
|
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
|
|
assert len(rows) == 1
|
|
assert rows[0]["host_id"] == "lab1"
|
|
assert rows[0]["episode_id"] == "01EPISODE"
|
|
|
|
|
|
def test_run_once_skips_episodes_without_done_marker(tmp_path: Path, receiver) -> None:
|
|
url, store = receiver
|
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
|
ep = cfg.episodes_dir / "01PARTIAL"
|
|
ep.mkdir(parents=True)
|
|
(ep / "meta.json").write_text("{}")
|
|
# Note: NO done.marker.
|
|
|
|
result = queue.run_once()
|
|
assert result.scanned == 0
|
|
assert result.shipped == 0
|
|
assert ep.exists() # untouched
|
|
assert store.index_path.read_text() == ""
|
|
|
|
|
|
def test_run_once_idempotent_re_ship_returns_already_present(tmp_path: Path, receiver) -> None:
|
|
"""If a prior run shipped an episode but crashed before retiring it,
|
|
the next run must re-ship the same bytes successfully (200) and
|
|
retire the dir, not flag it as a conflict."""
|
|
url, store = receiver
|
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
|
_make_episode(cfg, "01REPLAY", content=b"same-bytes")
|
|
|
|
queue.run_once()
|
|
assert (cfg.shipped_dir / "01REPLAY").exists()
|
|
|
|
# Simulate a crash: move it back as if retire never happened.
|
|
(cfg.shipped_dir / "01REPLAY").rename(cfg.episodes_dir / "01REPLAY")
|
|
|
|
result = queue.run_once()
|
|
assert result.scanned == 1
|
|
assert result.shipped == 1
|
|
assert (cfg.shipped_dir / "01REPLAY").exists()
|
|
|
|
# Index didn't double up.
|
|
rows = store.index_path.read_text().splitlines()
|
|
assert len(rows) == 1
|
|
|
|
|
|
def test_run_once_handles_409_conflict(tmp_path: Path, receiver) -> None:
|
|
"""If the same episode_id was previously shipped with *different*
|
|
bytes, the receiver returns 409 and the shipper must NOT retire
|
|
the local dir — operator triage required."""
|
|
url, _ = receiver
|
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
|
_make_episode(cfg, "01CONFLICT", content=b"first")
|
|
|
|
result = queue.run_once()
|
|
assert result.shipped == 1
|
|
|
|
# Simulate a re-do with different content but the same id (e.g., a
|
|
# botched re-run on the lab host).
|
|
(cfg.shipped_dir / "01CONFLICT").rename(cfg.episodes_dir / "01CONFLICT")
|
|
(cfg.episodes_dir / "01CONFLICT" / "meta.json").write_bytes(b"tampered")
|
|
|
|
result = queue.run_once()
|
|
assert result.scanned == 1
|
|
assert result.shipped == 0
|
|
assert result.conflicts == 1
|
|
# Local dir survives — operator can decide what to do.
|
|
assert (cfg.episodes_dir / "01CONFLICT").exists()
|
|
|
|
|
|
def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> None:
|
|
port = _free_port()
|
|
cfg, _, queue = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
|
|
_make_episode(cfg, "01DOWN")
|
|
|
|
result = queue.run_once()
|
|
assert result.scanned == 1
|
|
assert result.shipped == 0
|
|
assert result.transient_failures == 1
|
|
# Episode dir + tarball both stay in place for the next pass.
|
|
assert (cfg.episodes_dir / "01DOWN").exists()
|
|
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
|
|
|
|
|
|
def test_tarball_round_trips_episode_dir(tmp_path: Path, receiver) -> None:
|
|
"""The receiver-side tarball must extract back to the original
|
|
episode dir layout (modulo file order). Verifies the tar+zstd
|
|
pipe is intact."""
|
|
import subprocess
|
|
import tarfile
|
|
|
|
url, _ = receiver
|
|
cfg, _, queue = _make_shipper(tmp_path, url)
|
|
ep = _make_episode(cfg, "01ROUND", content=b"meta-bytes")
|
|
expected_files = sorted(p.name for p in ep.iterdir())
|
|
|
|
queue.run_once()
|
|
|
|
# The receiver stored it; pull the bytes back, decompress + untar.
|
|
rcv_path = next((tmp_path / "rcv-episodes" / "lab1").glob("01ROUND.tar.zst"))
|
|
decompressed = tmp_path / "01ROUND.tar"
|
|
subprocess.check_call(
|
|
["zstd", "-q", "-d", "-o", str(decompressed), str(rcv_path)],
|
|
)
|
|
extract_dir = tmp_path / "extracted"
|
|
extract_dir.mkdir()
|
|
with tarfile.open(decompressed) as tf:
|
|
tf.extractall(extract_dir)
|
|
|
|
got_files = sorted(p.name for p in (extract_dir / "01ROUND").iterdir())
|
|
assert got_files == expected_files
|