"""Tests for tools/check_fleet_health.py. The detector is the "did the fleet break and we didn't notice?" eye on the receiver. Bugs here are silent failures of monitoring — bad. We exercise the three detectable symptoms (silent, fatal-only, unstamped) and the dedup behaviour (sustained problem ≠ alert spam). """ from __future__ import annotations import importlib.util import json import sys import time from datetime import datetime, timezone from pathlib import Path import pytest REPO_ROOT = Path(__file__).resolve().parent.parent spec = importlib.util.spec_from_file_location( "check_fleet_health", REPO_ROOT / "tools" / "check_fleet_health.py" ) fh = importlib.util.module_from_spec(spec) sys.modules["check_fleet_health"] = fh spec.loader.exec_module(fh) def _iso(epoch: float) -> str: return datetime.fromtimestamp(epoch, tz=timezone.utc).isoformat() def _write_index(path: Path, rows: list[dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w") as f: for r in rows: f.write(json.dumps(r) + "\n") # --------------------------------------------------------------------------- # scan_index # --------------------------------------------------------------------------- def test_scan_index_picks_latest_per_host(tmp_path: Path) -> None: now = time.time() idx = tmp_path / "index.jsonl" _write_index(idx, [ {"host_id": "lab1", "received_at_wall": _iso(now - 3600)}, {"host_id": "lab1", "received_at_wall": _iso(now - 1800)}, {"host_id": "lab2", "received_at_wall": _iso(now - 60)}, ]) hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now) assert hosts["lab1"].last_seen_at_wall_iso == _iso(now - 1800) assert hosts["lab2"].last_seen_at_wall_iso == _iso(now - 60) assert hosts["lab1"].seen_in_lookback is True assert hosts["lab2"].seen_in_lookback is True def test_scan_index_marks_old_hosts_outside_lookback(tmp_path: Path) -> None: """A host last seen 48h ago shouldn't trigger silent-detection just because it's been quiet — it might be a decommissioned lab host. seen_in_lookback gates the silent alert.""" now = time.time() idx = tmp_path / "index.jsonl" _write_index(idx, [ {"host_id": "decom-host", "received_at_wall": _iso(now - 48 * 3600)}, ]) hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now) assert hosts["decom-host"].seen_in_lookback is False def test_scan_index_handles_malformed_rows(tmp_path: Path) -> None: """Real index.jsonl has occasional partial lines from prune operations. Don't crash on them.""" idx = tmp_path / "index.jsonl" idx.write_text( '{"host_id": "lab1", "received_at_wall": "2026-05-01T00:00:00+00:00"}\n' 'not-json\n' '{"host_id": "lab1"}\n' # missing received_at_wall '{"received_at_wall": "2026-05-01T00:00:00+00:00"}\n' # missing host_id '{"host_id": "lab2", "received_at_wall": "bogus-timestamp"}\n' '{"host_id": "lab3", "received_at_wall": "2026-05-01T00:00:01+00:00"}\n' ) hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=time.time()) # lab1 + lab3 parsed successfully; lab2's bad timestamp is dropped assert "lab1" in hosts assert "lab3" in hosts def test_scan_index_missing_file_returns_empty(tmp_path: Path) -> None: """Fresh receiver, index hasn't been created yet.""" hosts = fh.scan_index(tmp_path / "no-such-index.jsonl", lookback_hours=24, now_epoch=time.time()) assert hosts == {} # --------------------------------------------------------------------------- # detect_alerts # --------------------------------------------------------------------------- def test_detect_silent_fires_for_active_host_gone_quiet() -> None: now = time.time() hosts = { "lab1": fh.HostState( last_seen_at_wall_iso=_iso(now - 45 * 60), # 45 min ago seen_in_lookback=True, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert any(a.symptom == "silent" and a.host == "lab1" for a in alerts) def test_detect_silent_does_not_fire_below_threshold() -> None: now = time.time() hosts = { "lab1": fh.HostState( last_seen_at_wall_iso=_iso(now - 10 * 60), # 10 min ago seen_in_lookback=True, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert not any(a.symptom == "silent" for a in alerts) def test_detect_silent_skips_inactive_hosts() -> None: """Don't bug the operator about a host they haven't seen in days.""" now = time.time() hosts = { "old-decom": fh.HostState( last_seen_at_wall_iso=_iso(now - 5 * 86400), seen_in_lookback=False, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert not any(a.symptom == "silent" for a in alerts) def test_detect_fatal_only_fires_when_ratio_exceeds_and_no_2xx() -> None: now = time.time() hosts = { "lab1": fh.HostState( last_seen_at_wall_iso=_iso(now - 5 * 60), seen_in_lookback=True, recent_2xx=0, recent_4xx=200, recent_5xx=0, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert any(a.symptom == "fatal-only" and a.host == "lab1" for a in alerts) def test_detect_fatal_only_quiet_when_some_2xx_landing() -> None: """A host with mixed 2xx+4xx is making progress — not stuck.""" now = time.time() hosts = { "lab1": fh.HostState( last_seen_at_wall_iso=_iso(now - 5 * 60), seen_in_lookback=True, recent_2xx=20, recent_4xx=80, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert not any(a.symptom == "fatal-only" for a in alerts) def test_detect_fatal_only_quiet_below_min_count() -> None: """Don't fire on 2 PUTs both 4xx — could be a normal startup blip.""" now = time.time() hosts = { "lab1": fh.HostState( last_seen_at_wall_iso=_iso(now - 5 * 60), seen_in_lookback=True, recent_4xx=2, ), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) assert not any(a.symptom == "fatal-only" for a in alerts) def test_detect_unstamped_fires_above_min_count() -> None: now = time.time() hosts = { "lab1": fh.HostState(recent_unstamped_400=50), } alerts = fh.detect_alerts( hosts, silent_threshold_min=30, fatal_ratio_threshold=0.95, fatal_min_count=10, now_epoch=now, ) a = next(x for x in alerts if x.symptom == "unstamped") assert "pre-stamp" in a.detail assert "install-lab-host.sh" in a.suggested_fix # --------------------------------------------------------------------------- # emit_alerts (dedup behaviour) # --------------------------------------------------------------------------- def test_emit_alerts_writes_jsonl_and_dedups_within_hour(tmp_path: Path) -> None: """A sustained problem fires ONCE per hour — not every 5-min tick.""" alerts_path = tmp_path / "alerts.jsonl" a = fh.Alert( host="lab1", symptom="silent", detail="last shipped 45 min ago", suggested_fix="check status", detected_at_wall=_iso(time.time()), ) # First pass: alert is new, gets written. n1 = fh.emit_alerts(alerts_path, [a]) assert n1 == 1 assert alerts_path.exists() rows = [json.loads(l) for l in alerts_path.read_text().splitlines()] assert len(rows) == 1 assert rows[0]["host"] == "lab1" assert rows[0]["symptom"] == "silent" # Second pass with the same alert: dedup kicks in, file unchanged. n2 = fh.emit_alerts(alerts_path, [a]) assert n2 == 0 rows = [json.loads(l) for l in alerts_path.read_text().splitlines()] assert len(rows) == 1 def test_emit_alerts_distinguishes_symptoms(tmp_path: Path) -> None: """Same host with two DIFFERENT symptoms shouldn't dedup against each other.""" alerts_path = tmp_path / "alerts.jsonl" now_iso = _iso(time.time()) a1 = fh.Alert(host="lab1", symptom="silent", detail="d1", suggested_fix="f1", detected_at_wall=now_iso) a2 = fh.Alert(host="lab1", symptom="fatal-only", detail="d2", suggested_fix="f2", detected_at_wall=now_iso) n = fh.emit_alerts(alerts_path, [a1, a2]) assert n == 2 def test_emit_alerts_idempotent_when_no_alerts(tmp_path: Path) -> None: alerts_path = tmp_path / "alerts.jsonl" n = fh.emit_alerts(alerts_path, []) assert n == 0 assert not alerts_path.exists() # --------------------------------------------------------------------------- # main() smoke # --------------------------------------------------------------------------- def test_main_returns_zero_when_no_alerts(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: """End-to-end smoke. journalctl is stubbed (the runner is unlikely to have systemd-journal access in test env).""" idx = tmp_path / "index.jsonl" now = time.time() _write_index(idx, [ {"host_id": "lab1", "received_at_wall": _iso(now - 60)}, ]) alerts = tmp_path / "alerts.jsonl" real_run = fh.subprocess.run def fake_run(cmd, *args, **kwargs): if cmd and cmd[0] == "journalctl": class R: returncode = 0 stdout = "" stderr = "" return R() return real_run(cmd, *args, **kwargs) monkeypatch.setattr(fh.subprocess, "run", fake_run) rc = fh.main(["--index-path", str(idx), "--alerts-path", str(alerts)]) assert rc == 0 def test_main_returns_zero_even_when_alerts_emitted(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: """The detector exits 0 even when alerts are present — the alert IS the signal (via alerts.jsonl + WARNING log), not the unit's success state. systemd showing the unit as 'failed' on every detection would be misleading: the detector itself is working correctly.""" idx = tmp_path / "index.jsonl" now = time.time() # Active host but silent for >30 min → triggers 'silent' alert. _write_index(idx, [ {"host_id": "lab1", "received_at_wall": _iso(now - 60 * 60)}, ]) alerts = tmp_path / "alerts.jsonl" real_run = fh.subprocess.run def fake_run(cmd, *args, **kwargs): if cmd and cmd[0] == "journalctl": class R: returncode = 0 stdout = "" stderr = "" return R() return real_run(cmd, *args, **kwargs) monkeypatch.setattr(fh.subprocess, "run", fake_run) rc = fh.main(["--index-path", str(idx), "--alerts-path", str(alerts)]) assert rc == 0 # The alert WAS emitted, just not via exit code. assert alerts.exists() rows = alerts.read_text().splitlines() assert len(rows) >= 1