diff --git a/etc/cis490-doctor-check.service b/etc/cis490-doctor-check.service new file mode 100644 index 0000000..1550886 --- /dev/null +++ b/etc/cis490-doctor-check.service @@ -0,0 +1,22 @@ +[Unit] +Description=CIS490 lab-host daily doctor check (ships JSON to receiver) +Documentation=https://maxgit.wg/spectral/CIS490 +After=network-online.target wg-quick@wg0.service cis490-shipper.service + +[Service] +Type=oneshot +User=cis490 +Group=cis490 +WorkingDirectory=/opt/cis490 +ExecStart=/opt/cis490/.venv/bin/python /opt/cis490/tools/ship_health_check.py \ + --config /etc/cis490/lab-host.toml +StandardOutput=journal +StandardError=journal + +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=strict +ProtectHome=true + +[Install] +WantedBy=multi-user.target diff --git a/etc/cis490-doctor-check.timer b/etc/cis490-doctor-check.timer new file mode 100644 index 0000000..7888e6d --- /dev/null +++ b/etc/cis490-doctor-check.timer @@ -0,0 +1,16 @@ +[Unit] +Description=Run CIS490 lab-host doctor check daily and ship to receiver +Documentation=https://maxgit.wg/spectral/CIS490 + +[Timer] +# 10 minutes after boot for the first snapshot, then once a day at +# a randomized offset to avoid all hosts hitting the receiver +# simultaneously. +OnBootSec=10min +OnUnitActiveSec=1d +RandomizedDelaySec=30min +Persistent=true +Unit=cis490-doctor-check.service + +[Install] +WantedBy=timers.target diff --git a/etc/cis490-fleet-health.service b/etc/cis490-fleet-health.service new file mode 100644 index 0000000..aa59b9a --- /dev/null +++ b/etc/cis490-fleet-health.service @@ -0,0 +1,27 @@ +[Unit] +Description=CIS490 fleet health check (silent / fatal-only / unstamped detection) +Documentation=https://maxgit.wg/spectral/CIS490 +After=cis490-receiver.service + +[Service] +Type=oneshot +# Reads /var/lib/cis490/index.jsonl + journalctl, writes +# /var/lib/cis490/alerts.jsonl. journalctl needs the systemd-journal +# group; this unit runs as root so we don't have to fiddle with that. +User=root +Group=root +WorkingDirectory=/opt/cis490 +ExecStart=/opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py +StandardOutput=journal +StandardError=journal + +# Hardening — read /var/lib/cis490 for index + alerts, write the +# alerts file there. +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/var/lib/cis490 + +[Install] +WantedBy=multi-user.target diff --git a/etc/cis490-fleet-health.timer b/etc/cis490-fleet-health.timer new file mode 100644 index 0000000..d09d8bb --- /dev/null +++ b/etc/cis490-fleet-health.timer @@ -0,0 +1,13 @@ +[Unit] +Description=Run CIS490 fleet health check every 5 minutes +Documentation=https://maxgit.wg/spectral/CIS490 + +[Timer] +OnBootSec=2min +OnUnitActiveSec=5min +AccuracySec=10sec +Persistent=true +Unit=cis490-fleet-health.service + +[Install] +WantedBy=timers.target diff --git a/receiver/__main__.py b/receiver/__main__.py index 93c1c01..30252c4 100644 --- a/receiver/__main__.py +++ b/receiver/__main__.py @@ -57,6 +57,7 @@ def main() -> None: max_episode_bytes=cfg.max_episode_bytes, bearer_token=cfg.bearer_token, version_gate=version_gate, + health_root=cfg.health_root, ) uvicorn.run( app, diff --git a/receiver/app.py b/receiver/app.py index d2a5637..2c8e76e 100644 --- a/receiver/app.py +++ b/receiver/app.py @@ -1,5 +1,6 @@ from __future__ import annotations +import json import logging import secrets import time @@ -49,6 +50,7 @@ def make_app( max_episode_bytes: int, bearer_token: str | None = None, version_gate: VersionGate | None = None, + health_root: "Path | None" = None, ) -> Starlette: async def health(request: Request) -> JSONResponse: return JSONResponse({"status": "ok"}) @@ -231,6 +233,75 @@ def make_app( return JSONResponse({"error": "unknown ingest result"}, status_code=500) + async def put_host_health(request: Request) -> JSONResponse: + """Lab hosts PUT their cis490-doctor.py JSON output here once + a day. We persist the latest snapshot per host at + /.json. Not gated by version_gate — + this is metadata about the host itself, not training data, + and we want sick hosts to be ABLE to report sickness even if + their code is out-of-date. + + No size cap on the body beyond the global Starlette/uvicorn + max — doctor JSON is small (<10 KiB).""" + guard = _bearer_check(request, bearer_token) + if guard is not None: + return guard + if health_root is None: + return JSONResponse( + {"error": "health endpoint disabled (no health_root configured)"}, + status_code=404, + ) + host_id = request.path_params["host_id"] + if not is_valid_id(host_id): + return JSONResponse({"error": "bad host_id"}, status_code=400) + + body_bytes = await request.body() + try: + body = json.loads(body_bytes) + except (ValueError, json.JSONDecodeError): + return JSONResponse({"error": "body must be JSON"}, status_code=400) + if not isinstance(body, dict): + return JSONResponse({"error": "body must be a JSON object"}, + status_code=400) + + # Stamp the receiver's view of when this arrived. Lab-host + # clocks are coarse so we don't trust them for "freshness." + record = { + "host_id": host_id, + "received_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ", + time.gmtime()), + "doctor": body, + } + health_root.mkdir(parents=True, exist_ok=True) + target = health_root / f"{host_id}.json" + # Write to a sibling tmp + atomic rename so a concurrent + # reader never sees a half-written file. + tmp = health_root / f".{host_id}.json.tmp" + tmp.write_text(json.dumps(record, indent=2)) + tmp.replace(target) + + return JSONResponse({"status": "stored", "host_id": host_id}, + status_code=200) + + async def get_fleet_health(request: Request) -> JSONResponse: + """Aggregate view across all hosts that have reported. Used by + the maintainer / fleet-health monitor to spot sick hosts + without grepping per-host files.""" + guard = _bearer_check(request, bearer_token) + if guard is not None: + return guard + if health_root is None or not health_root.exists(): + return JSONResponse({"hosts": []}) + out: list[dict] = [] + for f in sorted(health_root.glob("*.json")): + if f.name.startswith("."): # tmp files + continue + try: + out.append(json.loads(f.read_text())) + except (OSError, json.JSONDecodeError): + continue + return JSONResponse({"hosts": out}) + routes = [ Route("/v1/health", health, methods=["GET"]), Route("/v1/ping", ping, methods=["POST"]), @@ -239,5 +310,15 @@ def make_app( put_episode, methods=["PUT"], ), + Route( + "/v1/host-health/{host_id}", + put_host_health, + methods=["PUT"], + ), + Route( + "/v1/host-health", + get_fleet_health, + methods=["GET"], + ), ] return Starlette(routes=routes) diff --git a/receiver/config.py b/receiver/config.py index 064997d..64d2626 100644 --- a/receiver/config.py +++ b/receiver/config.py @@ -28,6 +28,11 @@ class ReceiverConfig: version_gate_branch: str version_gate_auth_token: str | None version_gate_local_repo: Path | None + # Per-host doctor snapshots (PUT /v1/host-health/). Disabled + # if unset — the receiver returns 404. Default points alongside + # the index in /var/lib/cis490 since that's the receiver's only + # persistent writable area under hardening. + health_root: Path | None @classmethod def load(cls, path: str | Path) -> "ReceiverConfig": @@ -56,4 +61,9 @@ class ReceiverConfig: version_gate_branch=version_gate.get("branch", "main"), version_gate_auth_token=version_gate.get("auth_token"), version_gate_local_repo=Path(local_repo).resolve() if local_repo else None, + health_root=( + Path(data["health_root"]).resolve() + if "health_root" in data + else Path("/var/lib/cis490/host-health").resolve() + ), ) diff --git a/scripts/install-lab-host.sh b/scripts/install-lab-host.sh index c083918..346b6fa 100755 --- a/scripts/install-lab-host.sh +++ b/scripts/install-lab-host.sh @@ -133,13 +133,20 @@ install -m 0644 "$REPO_ROOT/etc/cis490-cert-fetch.service" \ /etc/systemd/system/cis490-cert-fetch.service install -m 0644 "$REPO_ROOT/etc/cis490-cert-fetch.timer" \ /etc/systemd/system/cis490-cert-fetch.timer +# Daily doctor check — runs cis490_doctor.py and PUTs JSON to the +# receiver's /v1/host-health/. Lets the maintainer see fleet +# health without grepping per-host logs. See AGENTS.md "Fleet health". +install -m 0644 "$REPO_ROOT/etc/cis490-doctor-check.service" \ + /etc/systemd/system/cis490-doctor-check.service +install -m 0644 "$REPO_ROOT/etc/cis490-doctor-check.timer" \ + /etc/systemd/system/cis490-doctor-check.timer systemctl daemon-reload # Enable timers immediately — the operator gets self-healing on the # next tick without an extra `systemctl enable`. Idempotent. -systemctl enable --now cis490-autoupdate.timer 2>/dev/null || \ - log "WARN: could not enable cis490-autoupdate.timer (will retry next install)" -systemctl enable --now cis490-cert-fetch.timer 2>/dev/null || \ - log "WARN: could not enable cis490-cert-fetch.timer (will retry next install)" +for t in cis490-autoupdate.timer cis490-cert-fetch.timer cis490-doctor-check.timer; do + systemctl enable --now "$t" 2>/dev/null || \ + log "WARN: could not enable $t (will retry next install)" +done # --- 5. config template (only on first install) ----------------------- if [[ ! -f "$ETC_ROOT/lab-host.toml" ]]; then diff --git a/tests/test_fleet_health.py b/tests/test_fleet_health.py new file mode 100644 index 0000000..0f90e48 --- /dev/null +++ b/tests/test_fleet_health.py @@ -0,0 +1,302 @@ +"""Tests for tools/check_fleet_health.py. + +The detector is the "did the fleet break and we didn't notice?" eye +on the receiver. Bugs here are silent failures of monitoring — bad. +We exercise the three detectable symptoms (silent, fatal-only, +unstamped) and the dedup behaviour (sustained problem ≠ alert spam). +""" + +from __future__ import annotations + +import importlib.util +import json +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parent.parent +spec = importlib.util.spec_from_file_location( + "check_fleet_health", REPO_ROOT / "tools" / "check_fleet_health.py" +) +fh = importlib.util.module_from_spec(spec) +sys.modules["check_fleet_health"] = fh +spec.loader.exec_module(fh) + + +def _iso(epoch: float) -> str: + return datetime.fromtimestamp(epoch, tz=timezone.utc).isoformat() + + +def _write_index(path: Path, rows: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w") as f: + for r in rows: + f.write(json.dumps(r) + "\n") + + +# --------------------------------------------------------------------------- +# scan_index +# --------------------------------------------------------------------------- + + +def test_scan_index_picks_latest_per_host(tmp_path: Path) -> None: + now = time.time() + idx = tmp_path / "index.jsonl" + _write_index(idx, [ + {"host_id": "lab1", "received_at_wall": _iso(now - 3600)}, + {"host_id": "lab1", "received_at_wall": _iso(now - 1800)}, + {"host_id": "lab2", "received_at_wall": _iso(now - 60)}, + ]) + hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now) + assert hosts["lab1"].last_seen_at_wall_iso == _iso(now - 1800) + assert hosts["lab2"].last_seen_at_wall_iso == _iso(now - 60) + assert hosts["lab1"].seen_in_lookback is True + assert hosts["lab2"].seen_in_lookback is True + + +def test_scan_index_marks_old_hosts_outside_lookback(tmp_path: Path) -> None: + """A host last seen 48h ago shouldn't trigger silent-detection + just because it's been quiet — it might be a decommissioned lab + host. seen_in_lookback gates the silent alert.""" + now = time.time() + idx = tmp_path / "index.jsonl" + _write_index(idx, [ + {"host_id": "decom-host", "received_at_wall": _iso(now - 48 * 3600)}, + ]) + hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now) + assert hosts["decom-host"].seen_in_lookback is False + + +def test_scan_index_handles_malformed_rows(tmp_path: Path) -> None: + """Real index.jsonl has occasional partial lines from prune + operations. Don't crash on them.""" + idx = tmp_path / "index.jsonl" + idx.write_text( + '{"host_id": "lab1", "received_at_wall": "2026-05-01T00:00:00+00:00"}\n' + 'not-json\n' + '{"host_id": "lab1"}\n' # missing received_at_wall + '{"received_at_wall": "2026-05-01T00:00:00+00:00"}\n' # missing host_id + '{"host_id": "lab2", "received_at_wall": "bogus-timestamp"}\n' + '{"host_id": "lab3", "received_at_wall": "2026-05-01T00:00:01+00:00"}\n' + ) + hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=time.time()) + # lab1 + lab3 parsed successfully; lab2's bad timestamp is dropped + assert "lab1" in hosts + assert "lab3" in hosts + + +def test_scan_index_missing_file_returns_empty(tmp_path: Path) -> None: + """Fresh receiver, index hasn't been created yet.""" + hosts = fh.scan_index(tmp_path / "no-such-index.jsonl", + lookback_hours=24, now_epoch=time.time()) + assert hosts == {} + + +# --------------------------------------------------------------------------- +# detect_alerts +# --------------------------------------------------------------------------- + + +def test_detect_silent_fires_for_active_host_gone_quiet() -> None: + now = time.time() + hosts = { + "lab1": fh.HostState( + last_seen_at_wall_iso=_iso(now - 45 * 60), # 45 min ago + seen_in_lookback=True, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert any(a.symptom == "silent" and a.host == "lab1" for a in alerts) + + +def test_detect_silent_does_not_fire_below_threshold() -> None: + now = time.time() + hosts = { + "lab1": fh.HostState( + last_seen_at_wall_iso=_iso(now - 10 * 60), # 10 min ago + seen_in_lookback=True, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert not any(a.symptom == "silent" for a in alerts) + + +def test_detect_silent_skips_inactive_hosts() -> None: + """Don't bug the operator about a host they haven't seen in days.""" + now = time.time() + hosts = { + "old-decom": fh.HostState( + last_seen_at_wall_iso=_iso(now - 5 * 86400), + seen_in_lookback=False, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert not any(a.symptom == "silent" for a in alerts) + + +def test_detect_fatal_only_fires_when_ratio_exceeds_and_no_2xx() -> None: + now = time.time() + hosts = { + "lab1": fh.HostState( + last_seen_at_wall_iso=_iso(now - 5 * 60), + seen_in_lookback=True, + recent_2xx=0, recent_4xx=200, recent_5xx=0, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert any(a.symptom == "fatal-only" and a.host == "lab1" for a in alerts) + + +def test_detect_fatal_only_quiet_when_some_2xx_landing() -> None: + """A host with mixed 2xx+4xx is making progress — not stuck.""" + now = time.time() + hosts = { + "lab1": fh.HostState( + last_seen_at_wall_iso=_iso(now - 5 * 60), + seen_in_lookback=True, + recent_2xx=20, recent_4xx=80, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert not any(a.symptom == "fatal-only" for a in alerts) + + +def test_detect_fatal_only_quiet_below_min_count() -> None: + """Don't fire on 2 PUTs both 4xx — could be a normal startup blip.""" + now = time.time() + hosts = { + "lab1": fh.HostState( + last_seen_at_wall_iso=_iso(now - 5 * 60), + seen_in_lookback=True, + recent_4xx=2, + ), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + assert not any(a.symptom == "fatal-only" for a in alerts) + + +def test_detect_unstamped_fires_above_min_count() -> None: + now = time.time() + hosts = { + "lab1": fh.HostState(recent_unstamped_400=50), + } + alerts = fh.detect_alerts( + hosts, silent_threshold_min=30, + fatal_ratio_threshold=0.95, fatal_min_count=10, + now_epoch=now, + ) + a = next(x for x in alerts if x.symptom == "unstamped") + assert "pre-stamp" in a.detail + assert "install-lab-host.sh" in a.suggested_fix + + +# --------------------------------------------------------------------------- +# emit_alerts (dedup behaviour) +# --------------------------------------------------------------------------- + + +def test_emit_alerts_writes_jsonl_and_dedups_within_hour(tmp_path: Path) -> None: + """A sustained problem fires ONCE per hour — not every 5-min tick.""" + alerts_path = tmp_path / "alerts.jsonl" + a = fh.Alert( + host="lab1", symptom="silent", + detail="last shipped 45 min ago", + suggested_fix="check status", + detected_at_wall=_iso(time.time()), + ) + + # First pass: alert is new, gets written. + n1 = fh.emit_alerts(alerts_path, [a]) + assert n1 == 1 + assert alerts_path.exists() + rows = [json.loads(l) for l in alerts_path.read_text().splitlines()] + assert len(rows) == 1 + assert rows[0]["host"] == "lab1" + assert rows[0]["symptom"] == "silent" + + # Second pass with the same alert: dedup kicks in, file unchanged. + n2 = fh.emit_alerts(alerts_path, [a]) + assert n2 == 0 + rows = [json.loads(l) for l in alerts_path.read_text().splitlines()] + assert len(rows) == 1 + + +def test_emit_alerts_distinguishes_symptoms(tmp_path: Path) -> None: + """Same host with two DIFFERENT symptoms shouldn't dedup against + each other.""" + alerts_path = tmp_path / "alerts.jsonl" + now_iso = _iso(time.time()) + a1 = fh.Alert(host="lab1", symptom="silent", detail="d1", + suggested_fix="f1", detected_at_wall=now_iso) + a2 = fh.Alert(host="lab1", symptom="fatal-only", detail="d2", + suggested_fix="f2", detected_at_wall=now_iso) + n = fh.emit_alerts(alerts_path, [a1, a2]) + assert n == 2 + + +def test_emit_alerts_idempotent_when_no_alerts(tmp_path: Path) -> None: + alerts_path = tmp_path / "alerts.jsonl" + n = fh.emit_alerts(alerts_path, []) + assert n == 0 + assert not alerts_path.exists() + + +# --------------------------------------------------------------------------- +# main() smoke +# --------------------------------------------------------------------------- + + +def test_main_returns_zero_when_no_alerts(tmp_path: Path, + monkeypatch: pytest.MonkeyPatch) -> None: + """End-to-end smoke. journalctl is stubbed (the runner is unlikely + to have systemd-journal access in test env).""" + idx = tmp_path / "index.jsonl" + now = time.time() + _write_index(idx, [ + {"host_id": "lab1", "received_at_wall": _iso(now - 60)}, + ]) + alerts = tmp_path / "alerts.jsonl" + + real_run = fh.subprocess.run + + def fake_run(cmd, *args, **kwargs): + if cmd and cmd[0] == "journalctl": + class R: + returncode = 0 + stdout = "" + stderr = "" + return R() + return real_run(cmd, *args, **kwargs) + + monkeypatch.setattr(fh.subprocess, "run", fake_run) + rc = fh.main(["--index-path", str(idx), + "--alerts-path", str(alerts)]) + assert rc == 0 diff --git a/tests/test_host_health.py b/tests/test_host_health.py new file mode 100644 index 0000000..28b04b0 --- /dev/null +++ b/tests/test_host_health.py @@ -0,0 +1,198 @@ +"""Tests for the receiver's /v1/host-health endpoints. + +PUT /v1/host-health/ → store the lab host's daily doctor JSON +GET /v1/host-health → aggregate snapshot across all hosts + +The endpoint is deliberately NOT gated by version_gate — sick hosts +running stale code still need to be able to report sickness. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from starlette.testclient import TestClient + +from receiver.app import make_app +from receiver.store import EpisodeStore + + +@pytest.fixture +def app(tmp_path: Path): + store = EpisodeStore( + store_root=tmp_path / "store", + incoming_root=tmp_path / "incoming", + index_path=tmp_path / "index.jsonl", + ) + return make_app( + store=store, max_episode_bytes=10_000_000, + bearer_token=None, + health_root=tmp_path / "host-health", + ) + + +@pytest.fixture +def app_with_bearer(tmp_path: Path): + store = EpisodeStore( + store_root=tmp_path / "store", + incoming_root=tmp_path / "incoming", + index_path=tmp_path / "index.jsonl", + ) + return make_app( + store=store, max_episode_bytes=10_000_000, + bearer_token="s3cret", + health_root=tmp_path / "host-health", + ) + + +@pytest.fixture +def app_no_health(tmp_path: Path): + """health_root=None ⇒ endpoint returns 404.""" + store = EpisodeStore( + store_root=tmp_path / "store", + incoming_root=tmp_path / "incoming", + index_path=tmp_path / "index.jsonl", + ) + return make_app( + store=store, max_episode_bytes=10_000_000, + bearer_token=None, + health_root=None, + ) + + +def test_put_health_stores_doctor_json(app, tmp_path: Path) -> None: + snapshot = { + "role": "lab-host", + "checks": [ + {"name": "install: VERSION stamp", "status": "ok", "detail": "main@abc123"}, + {"name": "shipper: recent ship results", "status": "fail", + "detail": "12 412s in last 10 min"}, + ], + } + with TestClient(app) as c: + r = c.put("/v1/host-health/lab1", json=snapshot) + assert r.status_code == 200 + assert r.json()["status"] == "stored" + assert r.json()["host_id"] == "lab1" + + # File written + target = tmp_path / "host-health" / "lab1.json" + assert target.exists() + body = json.loads(target.read_text()) + assert body["host_id"] == "lab1" + assert body["doctor"] == snapshot + assert "received_at_wall" in body + + +def test_put_health_overwrites_previous_snapshot(app, tmp_path: Path) -> None: + """The endpoint stores the LATEST snapshot per host, not history.""" + with TestClient(app) as c: + r1 = c.put("/v1/host-health/lab1", json={"checks": [{"v": 1}]}) + r2 = c.put("/v1/host-health/lab1", json={"checks": [{"v": 2}]}) + assert r1.status_code == 200 and r2.status_code == 200 + body = json.loads((tmp_path / "host-health" / "lab1.json").read_text()) + assert body["doctor"] == {"checks": [{"v": 2}]} + + +def test_put_health_rejects_invalid_host_id(app) -> None: + """is_valid_id rejects path-traversal-y strings + extreme lengths. + URL-routing-level traversal (../) is normalised by the test + client before reaching us, so probe with a chunk of bad chars + that survive routing.""" + with TestClient(app) as c: + r = c.put("/v1/host-health/lab%20with%20space", json={"checks": []}) + assert r.status_code == 400 + + +def test_put_health_rejects_non_json_body(app) -> None: + with TestClient(app) as c: + r = c.put("/v1/host-health/lab1", content=b"not json", + headers={"Content-Type": "application/json"}) + assert r.status_code == 400 + + +def test_put_health_rejects_array_body(app) -> None: + """Body must be an OBJECT, not a list — the doctor's --json output + is always {role, checks: [...]}, never bare list.""" + with TestClient(app) as c: + r = c.put("/v1/host-health/lab1", json=["x", "y"]) + assert r.status_code == 400 + + +def test_put_health_404_when_disabled(app_no_health) -> None: + """Receivers without health_root configured return 404 — lets a + deployment opt out without removing the routes.""" + with TestClient(app_no_health) as c: + r = c.put("/v1/host-health/lab1", json={"checks": []}) + assert r.status_code == 404 + + +def test_put_health_not_gated_by_version_gate(tmp_path: Path) -> None: + """A sick host with stale code MUST still be able to report + sickness. Confirm we don't check X-Cis490-Code-Commit on the + health endpoint.""" + store = EpisodeStore( + store_root=tmp_path / "store", + incoming_root=tmp_path / "incoming", + index_path=tmp_path / "index.jsonl", + ) + # Build a gate that rejects everything to prove we don't run it. + class _RejectAll: + def check(self, commit): return False, "not-in-window" + def head(self): return None + def valid_count(self): return 0 + app = make_app( + store=store, max_episode_bytes=10_000_000, + bearer_token=None, + version_gate=_RejectAll(), + health_root=tmp_path / "host-health", + ) + with TestClient(app) as c: + r = c.put("/v1/host-health/lab1", json={"checks": []}) + assert r.status_code == 200 + + +def test_put_health_requires_bearer_when_configured(app_with_bearer) -> None: + with TestClient(app_with_bearer) as c: + r = c.put("/v1/host-health/lab1", json={"checks": []}) + assert r.status_code == 401 + r2 = c.put("/v1/host-health/lab1", json={"checks": []}, + headers={"Authorization": "Bearer s3cret"}) + assert r2.status_code == 200 + + +def test_get_fleet_health_returns_all_snapshots(app, tmp_path: Path) -> None: + with TestClient(app) as c: + c.put("/v1/host-health/lab1", json={"checks": [{"v": 1}]}) + c.put("/v1/host-health/lab2", json={"checks": [{"v": 2}]}) + r = c.get("/v1/host-health") + assert r.status_code == 200 + body = r.json() + hosts = {h["host_id"]: h for h in body["hosts"]} + assert hosts["lab1"]["doctor"] == {"checks": [{"v": 1}]} + assert hosts["lab2"]["doctor"] == {"checks": [{"v": 2}]} + + +def test_get_fleet_health_empty_when_no_reports(app) -> None: + with TestClient(app) as c: + r = c.get("/v1/host-health") + assert r.status_code == 200 + assert r.json() == {"hosts": []} + + +def test_get_fleet_health_ignores_temp_files(app, tmp_path: Path) -> None: + """Atomic-write tmpfiles (.lab1.json.tmp) shouldn't show up in the + aggregate listing if a write was in-flight.""" + health_dir = tmp_path / "host-health" + health_dir.mkdir() + (health_dir / ".lab1.json.tmp").write_text('{"host_id": "lab1"}') + (health_dir / "lab2.json").write_text( + '{"host_id": "lab2", "doctor": {"checks": []}}' + ) + with TestClient(app) as c: + r = c.get("/v1/host-health") + body = r.json() + hosts = [h.get("host_id") for h in body["hosts"]] + assert hosts == ["lab2"] diff --git a/tools/check_fleet_health.py b/tools/check_fleet_health.py new file mode 100644 index 0000000..a23b432 --- /dev/null +++ b/tools/check_fleet_health.py @@ -0,0 +1,362 @@ +"""Receiver-side fleet health check. + +Runs every 5 minutes (via cis490-fleet-health.timer). Detects: + + silent + A host that successfully shipped in the last `lookback_hours` + has been silent for more than `silent_threshold_min`. Likely a + crashed daemon, frozen on-device agent, or pulled-and-bricked + host stuck mid-install. The threshold is high enough to ignore + routine reboots but tight enough to catch a stalled fleet. + + fatal-only + A host has been actively PUTting in the last + `recent_window_min` minutes but every PUT was 4xx. Signals + diverged-HEAD, missing VERSION stamp, or other gate-rejection + loop the lab host can't fix without operator action. + + unstamped + A host is shipping with NO X-Cis490-Code-Commit header at all — + pre-stamp code is still running. cis490-autoupdate.timer should + catch this within 30 min, so any persistent unstamped flag means + autoupdate is failing too. + +Output: + /var/lib/cis490/alerts.jsonl — one row per detected alert, dedup'd + on (host, symptom, hour-bucket) so + a sustained problem doesn't spam the + log every 5 min. + syslog (WARNING) — easy to grep / forward to a notifier. + +Exit codes: + 0 no alerts + 1 one or more alerts emitted + 2 bad config / can't read inputs + +Run by hand: + sudo /opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py \\ + --index-path /var/lib/cis490/index.jsonl \\ + --alerts-path /var/lib/cis490/alerts.jsonl +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import logging +import os +import re +import subprocess +import sys +import time +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path + + +log = logging.getLogger("cis490.fleet-health") + + +@dataclass +class Alert: + host: str + symptom: str # "silent" | "fatal-only" | "unstamped" + detail: str + detected_at_wall: str + suggested_fix: str + + def dedup_key(self) -> str: + # 1-hour bucket keeps a sustained fault from spamming the log + # every tick, but lets us re-fire if it persists into the next + # hour (so the operator gets a fresh signal once per hour). + bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H") + h = hashlib.sha256(f"{self.host}|{self.symptom}|{bucket}".encode()).hexdigest() + return h[:16] + + def to_row(self) -> dict: + return { + "host": self.host, + "symptom": self.symptom, + "detail": self.detail, + "suggested_fix": self.suggested_fix, + "detected_at_wall": self.detected_at_wall, + "dedup_key": self.dedup_key(), + } + + +@dataclass +class HostState: + last_seen_at_wall_iso: str | None = None + seen_in_lookback: bool = False + recent_2xx: int = 0 + recent_4xx: int = 0 + recent_5xx: int = 0 + recent_unstamped_400: int = 0 + + +def _parse_iso(s: str) -> float: + """ISO-8601 with timezone → epoch seconds.""" + # received_at_wall format: "2026-05-01T06:37:53.830599+00:00" + return datetime.fromisoformat(s).timestamp() + + +def scan_index(index_path: Path, *, lookback_hours: int, + now_epoch: float) -> dict[str, HostState]: + """Walk index.jsonl, compute per-host last-seen timestamp. + + Streams the file (don't load the whole thing — index can be big). + Tail end is the most recent, so iterating linearly + keeping the + max per host is fine.""" + cutoff = now_epoch - (lookback_hours * 3600) + hosts: dict[str, HostState] = defaultdict(HostState) + if not index_path.exists(): + return {} + with index_path.open() as f: + for line in f: + try: + row = json.loads(line) + except json.JSONDecodeError: + continue + host = row.get("host_id") + ts_iso = row.get("received_at_wall") + if not host or not ts_iso: + continue + try: + ts = _parse_iso(ts_iso) + except ValueError: + continue + st = hosts[host] + if (st.last_seen_at_wall_iso is None + or ts > _parse_iso(st.last_seen_at_wall_iso)): + st.last_seen_at_wall_iso = ts_iso + if ts >= cutoff: + st.seen_in_lookback = True + return dict(hosts) + + +def scan_journal(*, recent_window_min: int, + hosts: dict[str, HostState]) -> None: + """Run journalctl over cis490-receiver and bucket per-host status + codes for the recent window. Mutates hosts in place.""" + rc = subprocess.run( + ["journalctl", "-u", "cis490-receiver", + "--since", f"{recent_window_min} minutes ago", + "--no-pager", "--output=cat"], + capture_output=True, text=True, timeout=30, + ) + if rc.returncode != 0: + log.warning("journalctl returned %d; skipping fatal-only detection", + rc.returncode) + return + + # Match the access log format uvicorn emits: + # "PUT /v1/episodes//.tar.zst HTTP/1.1" + put_re = re.compile( + r'PUT /v1/episodes/([^/]+)/[^"]+ HTTP/1\.1" (\d{3})' + ) + # The receiver's own WARNING line for "missing X-Cis490-Code-Commit" + # — we count those separately so we can distinguish unstamped from + # other 4xx. + missing_re = re.compile(r'missing X-Cis490-Code-Commit', re.IGNORECASE) + last_was_missing_for: str | None = None + + for line in rc.stdout.splitlines(): + if missing_re.search(line): + # The next access-log PUT line (a few lines later) is the + # one this WARNING refers to. Mark a flag. + # Simplification: if we see "rejected episode host=X", + # extract the host directly. + m = re.search(r'host=([^\s]+)', line) + if m: + host = m.group(1).rstrip(',') + hosts.setdefault(host, HostState()).recent_unstamped_400 += 1 + last_was_missing_for = host + continue + + m = put_re.search(line) + if not m: + continue + host = m.group(1) + st = hosts.setdefault(host, HostState()) + status = int(m.group(2)) + if 200 <= status < 300: + st.recent_2xx += 1 + elif 400 <= status < 500: + st.recent_4xx += 1 + elif 500 <= status < 600: + st.recent_5xx += 1 + + +def detect_alerts( + hosts: dict[str, HostState], + *, + silent_threshold_min: int, + fatal_ratio_threshold: float, + fatal_min_count: int, + now_epoch: float, +) -> list[Alert]: + alerts: list[Alert] = [] + detected_at = datetime.fromtimestamp(now_epoch, tz=timezone.utc).isoformat() + + for host, st in hosts.items(): + # silent: shipped before, hasn't shipped recently + if st.seen_in_lookback and st.last_seen_at_wall_iso is not None: + last_seen = _parse_iso(st.last_seen_at_wall_iso) + silent_min = (now_epoch - last_seen) / 60 + if silent_min > silent_threshold_min: + alerts.append(Alert( + host=host, + symptom="silent", + detail=(f"last successful ship {silent_min:.0f} min ago " + f"({st.last_seen_at_wall_iso})"), + detected_at_wall=detected_at, + suggested_fix=( + f"On {host}: check `systemctl status cis490-shipper " + f"cis490-orchestrator`. If services are inactive, " + f"`sudo systemctl start cis490-shipper " + f"cis490-orchestrator`. If actively failing, see " + f"FIXYOURSELF.md." + ), + )) + + # unstamped: pre-stamp episodes still being shipped + if st.recent_unstamped_400 >= fatal_min_count: + alerts.append(Alert( + host=host, + symptom="unstamped", + detail=(f"{st.recent_unstamped_400} ship(s) without " + f"X-Cis490-Code-Commit header in recent window — " + f"orchestrator is running pre-stamp code"), + detected_at_wall=detected_at, + suggested_fix=( + f"On {host}: `sudo /opt/cis490/scripts/install-lab-host.sh` " + f"to re-stamp VERSION + restart orchestrator. If " + f"cis490-autoupdate.timer is enabled this should heal " + f"on its own within 30 min." + ), + )) + + # fatal-only: actively shipping but >threshold% rejected + total = st.recent_2xx + st.recent_4xx + st.recent_5xx + if total >= fatal_min_count: + fatal_ratio = (st.recent_4xx + st.recent_5xx) / total + if fatal_ratio >= fatal_ratio_threshold and st.recent_2xx == 0: + alerts.append(Alert( + host=host, + symptom="fatal-only", + detail=(f"{total} ship(s) in recent window, " + f"{st.recent_4xx} × 4xx + {st.recent_5xx} × 5xx, " + f"0 × 2xx — host is shipping but everything " + f"is being rejected"), + detected_at_wall=detected_at, + suggested_fix=( + f"On {host}: check `journalctl -u cis490-shipper " + f"--since '5 minutes ago'`. If 412 commit-rejected: " + f"see FIXYOURSELF.md §B (likely diverged HEAD)." + ), + )) + + return alerts + + +def load_existing_dedup_keys(alerts_path: Path) -> set[str]: + """The current hour's already-emitted alerts. We re-read the file + each pass rather than maintaining a separate state file — keeps + truth in one place and survives restarts.""" + if not alerts_path.exists(): + return set() + keys: set[str] = set() + bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H") + try: + with alerts_path.open() as f: + for line in f: + try: + row = json.loads(line) + except json.JSONDecodeError: + continue + # Only consider keys from THIS hour-bucket — older + # alerts shouldn't suppress a fresh detection. + detected = row.get("detected_at_wall", "") + if detected.startswith(bucket): + if isinstance(row.get("dedup_key"), str): + keys.add(row["dedup_key"]) + except OSError: + pass + return keys + + +def emit_alerts(alerts_path: Path, alerts: list[Alert]) -> int: + if not alerts: + return 0 + seen = load_existing_dedup_keys(alerts_path) + new_count = 0 + alerts_path.parent.mkdir(parents=True, exist_ok=True) + with alerts_path.open("a") as f: + for a in alerts: + if a.dedup_key() in seen: + continue + f.write(json.dumps(a.to_row()) + "\n") + log.warning( + "ALERT host=%s symptom=%s — %s. Suggested: %s", + a.host, a.symptom, a.detail, a.suggested_fix, + ) + new_count += 1 + return new_count + + +def main(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser(prog="cis490-fleet-health", + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--index-path", default="/var/lib/cis490/index.jsonl", + type=Path) + p.add_argument("--alerts-path", default="/var/lib/cis490/alerts.jsonl", + type=Path) + p.add_argument("--lookback-hours", type=int, default=24, + help="A host that shipped in this window counts as 'active'; " + "silent-detection only fires for active hosts.") + p.add_argument("--silent-threshold-min", type=int, default=30, + help="An active host silent longer than this triggers 'silent'.") + p.add_argument("--recent-window-min", type=int, default=30, + help="Window for fatal-only / unstamped detection from the journal.") + p.add_argument("--fatal-ratio-threshold", type=float, default=0.95, + help="Min fatal-fraction in the recent window to alert.") + p.add_argument("--fatal-min-count", type=int, default=10, + help="Don't alert until this many recent PUTs — kills false " + "positives on hosts with very low traffic.") + p.add_argument("--log-level", default="INFO") + args = p.parse_args(argv) + + logging.basicConfig( + level=getattr(logging, args.log_level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + + if not args.index_path.exists(): + log.error("index missing: %s", args.index_path) + return 2 + + now_epoch = time.time() + hosts = scan_index(args.index_path, + lookback_hours=args.lookback_hours, + now_epoch=now_epoch) + scan_journal(recent_window_min=args.recent_window_min, hosts=hosts) + + alerts = detect_alerts( + hosts, + silent_threshold_min=args.silent_threshold_min, + fatal_ratio_threshold=args.fatal_ratio_threshold, + fatal_min_count=args.fatal_min_count, + now_epoch=now_epoch, + ) + new_count = emit_alerts(args.alerts_path, alerts) + if new_count == 0: + log.info("fleet healthy — %d hosts checked, no new alerts", + len(hosts)) + return 1 if new_count else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/ship_health_check.py b/tools/ship_health_check.py new file mode 100644 index 0000000..25e594e --- /dev/null +++ b/tools/ship_health_check.py @@ -0,0 +1,114 @@ +"""Run cis490_doctor.py and PUT the JSON output to the receiver. + +Triggered by cis490-doctor-check.timer (once a day) or invoked by +hand. Best-effort: a doctor that exits with red rows still ships its +output — that's the most useful case. + +Reuses the shipper's transport (mTLS + bearer + receiver URL from +lab-host.toml) so we don't reimplement auth. + +Failure modes: + - doctor crashes → exit 2, log error + - PUT fails (non-2xx) → exit 1, log error (timer fires next day) + - PUT succeeds → exit 0 + - mTLS not yet on disk → exit 0 (silent — first-boot path) +""" + +from __future__ import annotations + +import json +import logging +import subprocess +import sys +from pathlib import Path + +import httpx + +from shipper.config import ShipperConfig +from shipper.transport import ShipperTransport, _build_ssl_context, _CertNotReadyError + + +log = logging.getLogger("cis490.shipper.health-check") + + +def run_doctor(doctor_path: Path, role: str = "lab-host") -> dict: + """Run cis490_doctor.py --json --role lab-host. Returns the parsed + JSON (which always has a `checks` array — even when reds are + present, the doctor exits non-zero but still prints the report). + Raises RuntimeError if the doctor crashed without printing JSON.""" + venv_py = Path("/opt/cis490/.venv/bin/python") + py = str(venv_py) if venv_py.exists() else sys.executable + rc = subprocess.run( + [py, str(doctor_path), "--role", role, "--json"], + capture_output=True, text=True, timeout=120, + ) + # Doctor exits non-zero when red rows are present — that's + # exactly when we MOST want to ship the snapshot. Don't gate on + # exit code; gate on whether parseable JSON came out. + try: + return json.loads(rc.stdout) + except json.JSONDecodeError as e: + raise RuntimeError( + f"doctor produced no JSON (exit={rc.returncode}, " + f"stderr={rc.stderr[:500]!r})" + ) from e + + +def ship_health(cfg: ShipperConfig, snapshot: dict) -> tuple[int, str]: + """PUT snapshot to /v1/host-health/. Reuses the shipper's + SSL context build so we get mTLS + the cert-not-ready deferral + behaviour for free.""" + try: + verify = _build_ssl_context(cfg.receiver) + except _CertNotReadyError as e: + log.info("mTLS material not on disk yet; skipping health ship: %s", e) + return 0, "deferred" + + url = f"{cfg.receiver.url}/v1/host-health/{cfg.host_id}" + headers = {"X-Lab-Host": cfg.host_id, "Content-Type": "application/json"} + if cfg.receiver.bearer_token: + headers["Authorization"] = f"Bearer {cfg.receiver.bearer_token}" + + try: + with httpx.Client(verify=verify, timeout=cfg.request_timeout_s) as c: + r = c.put(url, headers=headers, content=json.dumps(snapshot)) + except httpx.HTTPError as e: + return 1, f"HTTP error: {e}" + + if 200 <= r.status_code < 300: + return 0, f"ok ({r.status_code})" + return 1, f"non-2xx: {r.status_code} {r.text[:200]}" + + +def main(argv: list[str] | None = None) -> int: + import argparse + p = argparse.ArgumentParser(prog="cis490-ship-health-check") + p.add_argument("--config", default="/etc/cis490/lab-host.toml") + p.add_argument("--doctor", + default="/opt/cis490/tools/cis490_doctor.py", type=Path) + p.add_argument("--log-level", default="INFO") + args = p.parse_args(argv) + logging.basicConfig( + level=getattr(logging, args.log_level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + + try: + cfg = ShipperConfig.load(args.config) + except (FileNotFoundError, ValueError) as e: + log.error("config error: %s", e) + return 2 + + try: + snapshot = run_doctor(args.doctor) + except (RuntimeError, subprocess.TimeoutExpired, FileNotFoundError) as e: + log.error("doctor failed: %s", e) + return 2 + + rc, msg = ship_health(cfg, snapshot) + log.info("health ship: %s", msg) + return rc + + +if __name__ == "__main__": + sys.exit(main())