CIS490/tests/test_shipper.py
max f9b2e5c4e6 shipper: systemd watchdog, quarantine cleanup; doctor surfaces ship errors
Three robustness items off the future-work list:

1. Shipper sd_notify watchdog. Type=notify + WatchdogSec=180. The
   daemon sends READY=1 after queue construction and WATCHDOG=1 once
   per scan pass via a heartbeat callback wired into run_forever.
   Restart=on-failure only catches process death — silent stalls
   (deadlock, hung tar subprocess, blocked I/O past timeout) used to
   leave a zombie running with the data backlog growing. Now systemd
   kills + restarts the daemon if no WATCHDOG=1 arrives within 180s.

   Verified end-to-end against systemd via `systemd-run --transient
   --property=Type=notify --property=WatchdogSec=10`: unit transitions
   to active on READY=1; SIGSTOP'ing the process triggers
   `Watchdog timeout (limit 10s)! Killing process N with SIGABRT` at
   exactly t+10s, then unit goes failed → restart cycle.

2. Quarantine cleanup. Without an upper bound, data/quarantine/ grew
   forever as fatal episodes piled up. New ShipperConfig fields:
     quarantine_keep_days = 30           # opt-out: 0 disables
     quarantine_cleanup_interval_s = 3600 # gate so 5s tick doesn't
                                          # statx() the whole tree
   Cleanup runs at the start of run_once() but is gated to once per
   hour. Removed entries logged.

3. Doctor surfaces shipping errors. Tails 10 minutes of cis490-shipper
   journal and surfaces 412/400/transient patterns as red/yellow rows
   with the canonical fix command. An on-device agent running
   cis490_doctor.py now sees one line ("12 ship(s) rejected as
   out-of-window") instead of needing to grep the journal.

Tests: 200/200 (was 188). New coverage: heartbeat callback fires +
survives exceptions; quarantine cleanup respects keep_days, gate, and
opt-out; doctor parser correctly classifies 412/400/transient/clean/
empty/journalctl-denied; both error classes prioritise 412 (more
actionable) when present together.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 12:02:59 -05:00

617 lines
20 KiB
Python

"""End-to-end shipper tests.
These run a real Uvicorn server bound to 127.0.0.1 on a free port,
hosting the actual receiver Starlette app over an EpisodeStore on a
temp dir. The shipper then talks to that server with its real
`httpx.Client` — same code path as production. This catches things
the receiver-side ASGI tests can't (HTTP framing, header handling,
sync httpx behaviour, content-length quirks).
"""
from __future__ import annotations
import json
import socket
import threading
import time
from pathlib import Path
import httpx
import pytest
import uvicorn
from receiver.app import make_app
from receiver.store import EpisodeStore
from shipper.config import ReceiverEndpoint, ShipperConfig
from shipper.queue import ShipperQueue
from shipper.transport import ShipperTransport
# ---------------------------------------------------------------------------
# Live-receiver fixture
# ---------------------------------------------------------------------------
def _free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
class _ServerThread(threading.Thread):
def __init__(self, app, port: int) -> None:
super().__init__(daemon=True)
cfg = uvicorn.Config(
app,
host="127.0.0.1",
port=port,
log_level="error",
lifespan="off",
access_log=False,
)
self.server = uvicorn.Server(cfg)
def run(self) -> None:
self.server.run()
def stop(self) -> None:
self.server.should_exit = True
def _wait_for_port(port: int, timeout_s: float = 5.0) -> None:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with httpx.Client(timeout=0.5) as c:
r = c.get(f"http://127.0.0.1:{port}/v1/health")
if r.status_code == 200:
return
except httpx.HTTPError:
pass
time.sleep(0.05)
raise TimeoutError(f"receiver on 127.0.0.1:{port} did not come up")
@pytest.fixture
def store(tmp_path: Path) -> EpisodeStore:
return EpisodeStore(
store_root=tmp_path / "rcv-episodes",
incoming_root=tmp_path / "rcv-incoming",
index_path=tmp_path / "rcv-index.jsonl",
)
@pytest.fixture
def receiver(store: EpisodeStore):
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token=None)
port = _free_port()
server = _ServerThread(app, port)
server.start()
try:
_wait_for_port(port)
yield f"http://127.0.0.1:{port}", store
finally:
server.stop()
server.join(timeout=2)
@pytest.fixture
def receiver_with_bearer(store: EpisodeStore):
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token="s3cret")
port = _free_port()
server = _ServerThread(app, port)
server.start()
try:
_wait_for_port(port)
yield f"http://127.0.0.1:{port}", store
finally:
server.stop()
server.join(timeout=2)
def _make_shipper(
tmp_path: Path,
receiver_url: str,
*,
host_id: str = "lab1",
bearer: str | None = None,
) -> tuple[ShipperConfig, ShipperTransport, ShipperQueue]:
data_root = tmp_path / "lab-data"
cfg = ShipperConfig(
host_id=host_id,
data_root=data_root,
receiver=ReceiverEndpoint(url=receiver_url, bearer_token=bearer),
scan_interval_s=0.05,
)
transport = ShipperTransport(cfg)
queue = ShipperQueue(cfg, transport)
return cfg, transport, queue
def _make_episode(cfg: ShipperConfig, episode_id: str, *, content: bytes = b"data") -> Path:
ep = cfg.episodes_dir / episode_id
ep.mkdir(parents=True, exist_ok=True)
(ep / "meta.json").write_bytes(content)
(ep / "events.jsonl").write_text("{}\n")
(ep / "labels.jsonl").write_text("{}\n")
(ep / "telemetry-proc.jsonl").write_text("{}\n")
(ep / "done.marker").touch()
return ep
# ---------------------------------------------------------------------------
# Ping
# ---------------------------------------------------------------------------
def test_ping_returns_ok_against_running_receiver(tmp_path: Path, receiver) -> None:
url, _ = receiver
_, transport, _ = _make_shipper(tmp_path, url)
res = transport.ping()
assert res.ok is True
assert res.status_code == 200
assert res.body is not None
assert res.body["ok"] is True
assert res.body["host_id"] == "lab1"
assert res.body["schema_version"] == 1
def test_ping_writes_nothing_to_index(tmp_path: Path, receiver) -> None:
url, store = receiver
_, transport, _ = _make_shipper(tmp_path, url)
transport.ping()
transport.ping()
transport.ping()
assert store.index_path.read_text() == ""
def test_ping_fails_with_wrong_bearer(tmp_path: Path, receiver_with_bearer) -> None:
url, _ = receiver_with_bearer
_, transport, _ = _make_shipper(tmp_path, url, bearer="WRONG")
res = transport.ping()
assert res.ok is False
assert res.status_code == 401
def test_ping_succeeds_with_right_bearer(tmp_path: Path, receiver_with_bearer) -> None:
url, _ = receiver_with_bearer
_, transport, _ = _make_shipper(tmp_path, url, bearer="s3cret")
res = transport.ping()
assert res.ok is True
assert res.status_code == 200
def test_ping_fails_when_receiver_unreachable(tmp_path: Path) -> None:
# Pick a free port and don't bind it — connect must fail.
port = _free_port()
_, transport, _ = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
res = transport.ping()
assert res.ok is False
assert res.status_code == 0
assert res.error is not None
def test_transport_defers_when_ca_bundle_missing(tmp_path: Path) -> None:
"""Issue #11: first-boot bring-up enables the shipper before the Pi
has issued the mTLS leaf. Construction must not crash; ping/ship
should return a transient error until the cert lands."""
missing_ca = tmp_path / "not-yet" / "wg-ca.pem"
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(
url="https://collector.wg",
ca_bundle=missing_ca,
),
)
# Construction MUST succeed even though the CA bundle is missing —
# this is the bug fix: previously raised FileNotFoundError out of
# ssl.create_default_context, crashing the systemd unit.
transport = ShipperTransport(cfg)
res = transport.ping()
assert res.ok is False
assert res.error is not None and "mTLS material" in res.error
# ---------------------------------------------------------------------------
# Tar + ship
# ---------------------------------------------------------------------------
def test_run_once_ships_one_done_episode(tmp_path: Path, receiver) -> None:
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01EPISODE")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 1
assert result.transient_failures == 0
# Episode dir moved to shipped/.
assert not (cfg.episodes_dir / "01EPISODE").exists()
assert (cfg.shipped_dir / "01EPISODE").exists()
# Outbox tarball cleaned up.
assert list(cfg.outbox_dir.iterdir()) == []
# Receiver stored it and indexed it.
assert store.final_path("lab1", "01EPISODE").exists()
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
assert len(rows) == 1
assert rows[0]["host_id"] == "lab1"
assert rows[0]["episode_id"] == "01EPISODE"
def test_run_once_skips_episodes_without_done_marker(tmp_path: Path, receiver) -> None:
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
ep = cfg.episodes_dir / "01PARTIAL"
ep.mkdir(parents=True)
(ep / "meta.json").write_text("{}")
# Note: NO done.marker.
result = queue.run_once()
assert result.scanned == 0
assert result.shipped == 0
assert ep.exists() # untouched
assert store.index_path.read_text() == ""
def test_run_once_idempotent_re_ship_returns_already_present(tmp_path: Path, receiver) -> None:
"""If a prior run shipped an episode but crashed before retiring it,
the next run must re-ship the same bytes successfully (200) and
retire the dir, not flag it as a conflict."""
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01REPLAY", content=b"same-bytes")
queue.run_once()
assert (cfg.shipped_dir / "01REPLAY").exists()
# Simulate a crash: move it back as if retire never happened.
(cfg.shipped_dir / "01REPLAY").rename(cfg.episodes_dir / "01REPLAY")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 1
assert (cfg.shipped_dir / "01REPLAY").exists()
# Index didn't double up.
rows = store.index_path.read_text().splitlines()
assert len(rows) == 1
def test_run_once_handles_409_conflict(tmp_path: Path, receiver) -> None:
"""If the same episode_id was previously shipped with *different*
bytes, the receiver returns 409 and the shipper must NOT retire
the local dir — operator triage required."""
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01CONFLICT", content=b"first")
result = queue.run_once()
assert result.shipped == 1
# Simulate a re-do with different content but the same id (e.g., a
# botched re-run on the lab host).
(cfg.shipped_dir / "01CONFLICT").rename(cfg.episodes_dir / "01CONFLICT")
(cfg.episodes_dir / "01CONFLICT" / "meta.json").write_bytes(b"tampered")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 0
assert result.conflicts == 1
# Local dir survives — operator can decide what to do.
assert (cfg.episodes_dir / "01CONFLICT").exists()
def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> None:
port = _free_port()
cfg, _, queue = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
_make_episode(cfg, "01DOWN")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 0
assert result.transient_failures == 1
# Episode dir + tarball both stay in place for the next pass.
assert (cfg.episodes_dir / "01DOWN").exists()
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
def test_quarantine_cleanup_removes_old_entries(tmp_path: Path) -> None:
"""Without an upper bound, quarantine/ grows forever. The cleanup
pass walks it once per cleanup_interval and drops anything past
keep_days — bounded by directory size since it just statx()s
each entry's mtime.
We run with cleanup_interval_s=0 so the gate fires on every pass,
and overload `os.utime` to age a fixture entry past the cutoff
without sleeping for real time."""
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=7,
quarantine_cleanup_interval_s=0.0, # always run on every pass
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
(old / "meta.json").write_text("{}")
new = cfg.quarantine_dir / "01NEW"
new.mkdir()
(new / "meta.json").write_text("{}")
# Backdate the OLD entry by 8 days. The directory's own mtime
# is what cleanup checks.
eight_days_ago = _time.time() - (8 * 86400)
_os.utime(old, (eight_days_ago, eight_days_ago))
queue._maybe_cleanup_quarantine()
assert not old.exists(), "8-day-old entry should be cleaned up"
assert new.exists(), "fresh entry should survive"
def test_quarantine_cleanup_disabled_when_keep_days_zero(tmp_path: Path) -> None:
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=0, # disabled
quarantine_cleanup_interval_s=0.0,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
eight_days_ago = _time.time() - (8 * 86400)
_os.utime(old, (eight_days_ago, eight_days_ago))
queue._maybe_cleanup_quarantine()
assert old.exists(), "cleanup must be a no-op when keep_days=0"
def test_quarantine_cleanup_respects_interval_gate(tmp_path: Path) -> None:
"""The interval gate prevents the 5s scan tick from statx()-ing
the whole quarantine tree on every pass."""
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=7,
quarantine_cleanup_interval_s=3600.0,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
# First pass: gate's 0.0 sentinel means we sweep.
queue._maybe_cleanup_quarantine()
first_at = queue._last_quarantine_cleanup_at
assert first_at > 0
# Stage an old entry AFTER the first sweep. The gate should
# block the next sweep until cleanup_interval_s has elapsed.
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
_os.utime(old, (_time.time() - 8 * 86400,) * 2)
queue._maybe_cleanup_quarantine()
assert old.exists(), "gate should defer the next sweep"
assert queue._last_quarantine_cleanup_at == first_at
def test_run_forever_calls_heartbeat(tmp_path: Path) -> None:
"""The heartbeat callback fires once per completed pass. In
production this is wired to sd_notify(WATCHDOG=1) so systemd's
WatchdogSec catches a hung scan loop."""
import threading
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
beats = []
stop = threading.Event()
def _heartbeat() -> None:
beats.append(time.monotonic())
if len(beats) >= 3:
stop.set()
queue.run_forever(stop_check=stop.is_set, heartbeat=_heartbeat)
assert len(beats) >= 3
def test_run_forever_survives_heartbeat_exception(tmp_path: Path) -> None:
"""A broken heartbeat (e.g. NOTIFY_SOCKET vanished) must not take
down the daemon — the loss of watchdog is tolerable; the loss
of the ship loop is not."""
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
pass_count = [0]
def _stop() -> bool:
return pass_count[0] >= 3
def _broken_heartbeat() -> None:
pass_count[0] += 1
raise RuntimeError("simulated NOTIFY_SOCKET failure")
# Should NOT raise.
queue.run_forever(stop_check=_stop, heartbeat=_broken_heartbeat)
assert pass_count[0] >= 3
def test_run_once_sweeps_orphaned_outbox_tarball(tmp_path: Path, receiver) -> None:
"""A tarball in outbox/ with no matching episode dir should get
cleaned up at the start of the next scan. The lifecycle invariant
is `outbox/<id>.tar.zst exists ⇒ episodes/<id>/ exists`; a
violation means external interference (operator rm-ed the
episode, OS crash, leftover from older buggy code) and we'd
otherwise carry dead bytes forever."""
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
# Stage an orphan: a tarball in outbox/ with no corresponding
# episodes/01ORPHAN/ directory.
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
orphan = cfg.outbox_dir / "01ORPHAN.tar.zst"
orphan.write_bytes(b"\x28\xb5\x2f\xfd") # zstd magic, not a real tarball
# Also a partial — same orphan rule applies.
partial = cfg.outbox_dir / "01PARTIAL.tar.zst.partial"
partial.write_bytes(b"x")
# And a non-orphan: tarball backed by an actual episode dir.
ep = _make_episode(cfg, "01LIVE")
queue.run_once()
assert not orphan.exists(), "orphan tarball must be swept"
assert not partial.exists(), "orphan partial must be swept"
# 01LIVE got shipped+retired in the same pass; both its tarball
# and its episode dir are gone (moved to shipped/).
assert (cfg.shipped_dir / "01LIVE").exists()
def test_run_once_quarantines_fatal_episode(tmp_path: Path) -> None:
"""A 4xx-other-than-409 (e.g. 400 missing-commit) means re-shipping
won't succeed. The shipper must move the episode out of the live
queue so the next scan doesn't burn a PUT on the same dir, AND
drop the outbox tarball so disk doesn't fill up with stale .zst.
Regression: pre-fix queue.py left fatal episodes in episodes/ on
every pass, so 4465+ pre-stamp episodes on k-gamingcom kept
fatal-looping at ~1 PUT/sec for 5+ hours after the receiver gate
went live."""
class _Always400Transport:
"""Stub transport that always rejects with a fatal 400.
Mirrors transport.py's own behaviour for 4xx-not-409."""
def __init__(self) -> None:
self.calls = 0
def ship_tarball(self, episode_id, tarball_path, sha256_hex,
commit=None):
self.calls += 1
from shipper.transport import ShipResult
return ShipResult(
status="fatal",
status_code=400,
sha256=None,
body={"error": "missing X-Cis490-Code-Commit header",
"remediation": "pull and reinstall"},
error="client error 400",
)
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
queue = ShipperQueue(cfg, _Always400Transport())
_make_episode(cfg, "01PRESTAMP")
result = queue.run_once()
assert result.scanned == 1
assert result.fatal == 1
assert result.shipped == 0
# Episode dir is OUT of episodes/ and IN quarantine/.
assert not (cfg.episodes_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP" / "meta.json").exists()
# The reason file carries enough context for triage.
reason = json.loads(
(cfg.quarantine_dir / "01PRESTAMP" / "quarantine_reason.json").read_text()
)
assert reason["status_code"] == 400
assert reason["error"] == "client error 400"
assert reason["body"]["error"] == "missing X-Cis490-Code-Commit header"
assert "quarantined_at_wall" in reason
# Outbox is empty — no stale tarball.
assert list(cfg.outbox_dir.iterdir()) == []
# Critically: a second pass is a no-op. The fix would be useless if
# quarantined episodes leaked back in.
result2 = queue.run_once()
assert result2.scanned == 0
assert result2.fatal == 0
def test_tarball_round_trips_episode_dir(tmp_path: Path, receiver) -> None:
"""The receiver-side tarball must extract back to the original
episode dir layout (modulo file order). Verifies the tar+zstd
pipe is intact."""
import subprocess
import tarfile
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
ep = _make_episode(cfg, "01ROUND", content=b"meta-bytes")
expected_files = sorted(p.name for p in ep.iterdir())
queue.run_once()
# The receiver stored it; pull the bytes back, decompress + untar.
rcv_path = next((tmp_path / "rcv-episodes" / "lab1").glob("01ROUND.tar.zst"))
decompressed = tmp_path / "01ROUND.tar"
subprocess.check_call(
["zstd", "-q", "-d", "-o", str(decompressed), str(rcv_path)],
)
extract_dir = tmp_path / "extracted"
extract_dir.mkdir()
with tarfile.open(decompressed) as tf:
tf.extractall(extract_dir)
got_files = sorted(p.name for p in (extract_dir / "01ROUND").iterdir())
assert got_files == expected_files