"""Tests for tools/quarantine_unstamped.py. This is the one-shot drain that we run on each lab host once after the commit-gate goes live. The behaviour we care about: - episodes WITH a 40-char-hex code_version.commit stay put - episodes WITHOUT that field move to quarantine/ - episodes lacking done.marker (still being written) are untouched - quarantine//quarantine_reason.json gets dropped beside it - re-running is a no-op (idempotent — pre-stamp episodes are gone, valid ones aren't touched) """ from __future__ import annotations import importlib.util import json import sys from pathlib import Path import pytest REPO_ROOT = Path(__file__).resolve().parent.parent spec = importlib.util.spec_from_file_location( "quarantine_unstamped", REPO_ROOT / "tools" / "quarantine_unstamped.py" ) qu = importlib.util.module_from_spec(spec) sys.modules["quarantine_unstamped"] = qu spec.loader.exec_module(qu) def _ep(root: Path, name: str, *, meta: dict | None, done: bool = True) -> Path: """Stage a fake episode under /episodes//.""" d = root / "episodes" / name d.mkdir(parents=True) if meta is not None: (d / "meta.json").write_text(json.dumps(meta)) if done: (d / "done.marker").touch() return d def test_drain_moves_unstamped_to_quarantine(tmp_path: Path) -> None: _ep(tmp_path, "01OLD", meta={"host_id": "lab1"}) # no code_version res = qu.drain(tmp_path) assert res.scanned == 1 assert res.quarantined == 1 assert res.kept_stamped == 0 assert not (tmp_path / "episodes" / "01OLD").exists() assert (tmp_path / "quarantine" / "01OLD" / "meta.json").exists() reason = json.loads( (tmp_path / "quarantine" / "01OLD" / "quarantine_reason.json").read_text() ) assert reason["status_code"] == 400 assert "pre-stamp" in reason["error"] def test_drain_keeps_stamped_episode(tmp_path: Path) -> None: """A stamped episode (40-char-hex commit) belongs in the live queue — the shipper will succeed against the receiver gate.""" _ep(tmp_path, "01NEW", meta={ "code_version": {"commit": "a" * 40, "branch": "main", "dirty": False}, }) res = qu.drain(tmp_path) assert res.scanned == 1 assert res.quarantined == 0 assert res.kept_stamped == 1 assert (tmp_path / "episodes" / "01NEW").exists() assert not (tmp_path / "quarantine" / "01NEW").exists() def test_drain_rejects_short_commit(tmp_path: Path) -> None: """A truncated/garbled commit is not accepted as 'stamped' — receiver would 400 it as bad-format anyway.""" _ep(tmp_path, "01SHORT", meta={"code_version": {"commit": "abc123"}}) res = qu.drain(tmp_path) assert res.quarantined == 1 def test_drain_rejects_non_hex_commit(tmp_path: Path) -> None: bad = "z" * 40 _ep(tmp_path, "01BADHEX", meta={"code_version": {"commit": bad}}) res = qu.drain(tmp_path) assert res.quarantined == 1 def test_drain_skips_in_progress_episode(tmp_path: Path) -> None: """No done.marker means the orchestrator is still writing to the dir — leave it alone, drainer is for 'finished and queued' only.""" _ep(tmp_path, "01PARTIAL", meta=None, done=False) res = qu.drain(tmp_path) assert res.scanned == 1 assert res.skipped_no_marker == 1 assert res.quarantined == 0 assert (tmp_path / "episodes" / "01PARTIAL").exists() def test_drain_handles_missing_meta_json(tmp_path: Path) -> None: """A done episode with no meta.json is corrupt — should be quarantined, not kept (it'd fail the gate too).""" _ep(tmp_path, "01NOMETA", meta=None, done=True) res = qu.drain(tmp_path) assert res.quarantined == 1 assert (tmp_path / "quarantine" / "01NOMETA" / "quarantine_reason.json").exists() def test_drain_is_idempotent(tmp_path: Path) -> None: _ep(tmp_path, "01OLD", meta={"host_id": "lab1"}) _ep(tmp_path, "01NEW", meta={"code_version": {"commit": "a" * 40}}) qu.drain(tmp_path) res2 = qu.drain(tmp_path) # Second pass: only the still-live stamped episode is scanned. assert res2.scanned == 1 assert res2.kept_stamped == 1 assert res2.quarantined == 0 def test_drain_missing_data_root_is_noop(tmp_path: Path) -> None: """First-boot: episodes/ may not exist yet. Drain shouldn't crash.""" res = qu.drain(tmp_path / "does-not-exist") assert res.scanned == 0 assert res.quarantined == 0 def test_drain_dry_run_moves_nothing(tmp_path: Path, capsys: pytest.CaptureFixture) -> None: _ep(tmp_path, "01OLD", meta={"host_id": "lab1"}) res = qu.drain(tmp_path, dry_run=True) assert res.quarantined == 1 # counted as if quarantined # But the episode is still in episodes/ — nothing actually moved. assert (tmp_path / "episodes" / "01OLD").exists() assert not (tmp_path / "quarantine" / "01OLD").exists() out = capsys.readouterr().out assert "would-quarantine 01OLD" in out def test_drain_collision_keeps_quarantine_copy(tmp_path: Path) -> None: """Re-running after a previous drain put the same id into quarantine. Should silently drop the live copy (matches the queue's _quarantine path semantics).""" _ep(tmp_path, "01DUP", meta={"host_id": "lab1"}) # Pre-existing quarantine entry from a previous run: (tmp_path / "quarantine" / "01DUP").mkdir(parents=True) (tmp_path / "quarantine" / "01DUP" / "meta.json").write_text("{}") res = qu.drain(tmp_path) assert res.quarantined == 1 assert not (tmp_path / "episodes" / "01DUP").exists() assert (tmp_path / "quarantine" / "01DUP" / "meta.json").exists()