fleet-health: proactive alerts on the Pi + per-host doctor reports

Two pieces of self-monitoring so the maintainer isn't the alarm:

(2) Receiver-side fleet health monitor

cis490-fleet-health.timer runs check_fleet_health.py every 5 min.
Detects three symptoms and writes them to
/var/lib/cis490/alerts.jsonl + a syslog WARNING (greppable / easy
to forward to a notifier):

  silent      — host shipped in last 24h but has been quiet >30 min
  fatal-only  — actively shipping but every PUT 4xx
  unstamped   — shipping without X-Cis490-Code-Commit header

Dedup is keyed on (host, symptom, hour-bucket) so a sustained fault
fires once per hour, not every 5 min. 15 unit tests cover the index
parser, three detectors, and dedup.

(3) Per-host doctor snapshots

Lab hosts run cis490-doctor-check.timer once a day (10 min after
boot, then daily with 30-min jitter). The timer runs
cis490_doctor.py --json and PUTs the result to a new endpoint:

  PUT /v1/host-health/<host>   →  /var/lib/cis490/host-health/<host>.json
  GET /v1/host-health          →  aggregate across all hosts

Endpoint is NOT gated by version_gate — sick hosts running stale
code MUST still be able to report sickness. 11 unit tests cover
PUT/GET, atomic-write semantics, bearer auth, and the
not-gated-by-version-gate property.

ship_health_check.py reuses the existing shipper transport (mTLS +
bearer + receiver URL from lab-host.toml) so we don't reimplement
auth.

Both timers wired into install-lab-host.sh — the loop also enables
the previously-added autoupdate + cert-fetch timers, so a single
install run gives a host all four self-healing mechanisms.

Tests: 293 pass (26 new — 15 fleet-health, 11 host-health). 2
pre-existing test_fleet.py failures from the elliott-ThinkPad
merge (667f042) are unrelated to this change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
max 2026-05-02 13:48:26 -05:00
parent 1bac2f0135
commit 49eba2fd60
12 changed files with 1157 additions and 4 deletions

View file

@ -0,0 +1,22 @@
[Unit]
Description=CIS490 lab-host daily doctor check (ships JSON to receiver)
Documentation=https://maxgit.wg/spectral/CIS490
After=network-online.target wg-quick@wg0.service cis490-shipper.service
[Service]
Type=oneshot
User=cis490
Group=cis490
WorkingDirectory=/opt/cis490
ExecStart=/opt/cis490/.venv/bin/python /opt/cis490/tools/ship_health_check.py \
--config /etc/cis490/lab-host.toml
StandardOutput=journal
StandardError=journal
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,16 @@
[Unit]
Description=Run CIS490 lab-host doctor check daily and ship to receiver
Documentation=https://maxgit.wg/spectral/CIS490
[Timer]
# 10 minutes after boot for the first snapshot, then once a day at
# a randomized offset to avoid all hosts hitting the receiver
# simultaneously.
OnBootSec=10min
OnUnitActiveSec=1d
RandomizedDelaySec=30min
Persistent=true
Unit=cis490-doctor-check.service
[Install]
WantedBy=timers.target

View file

@ -0,0 +1,27 @@
[Unit]
Description=CIS490 fleet health check (silent / fatal-only / unstamped detection)
Documentation=https://maxgit.wg/spectral/CIS490
After=cis490-receiver.service
[Service]
Type=oneshot
# Reads /var/lib/cis490/index.jsonl + journalctl, writes
# /var/lib/cis490/alerts.jsonl. journalctl needs the systemd-journal
# group; this unit runs as root so we don't have to fiddle with that.
User=root
Group=root
WorkingDirectory=/opt/cis490
ExecStart=/opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py
StandardOutput=journal
StandardError=journal
# Hardening — read /var/lib/cis490 for index + alerts, write the
# alerts file there.
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/cis490
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,13 @@
[Unit]
Description=Run CIS490 fleet health check every 5 minutes
Documentation=https://maxgit.wg/spectral/CIS490
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
AccuracySec=10sec
Persistent=true
Unit=cis490-fleet-health.service
[Install]
WantedBy=timers.target

View file

@ -57,6 +57,7 @@ def main() -> None:
max_episode_bytes=cfg.max_episode_bytes, max_episode_bytes=cfg.max_episode_bytes,
bearer_token=cfg.bearer_token, bearer_token=cfg.bearer_token,
version_gate=version_gate, version_gate=version_gate,
health_root=cfg.health_root,
) )
uvicorn.run( uvicorn.run(
app, app,

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import secrets import secrets
import time import time
@ -49,6 +50,7 @@ def make_app(
max_episode_bytes: int, max_episode_bytes: int,
bearer_token: str | None = None, bearer_token: str | None = None,
version_gate: VersionGate | None = None, version_gate: VersionGate | None = None,
health_root: "Path | None" = None,
) -> Starlette: ) -> Starlette:
async def health(request: Request) -> JSONResponse: async def health(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"}) return JSONResponse({"status": "ok"})
@ -231,6 +233,75 @@ def make_app(
return JSONResponse({"error": "unknown ingest result"}, status_code=500) return JSONResponse({"error": "unknown ingest result"}, status_code=500)
async def put_host_health(request: Request) -> JSONResponse:
"""Lab hosts PUT their cis490-doctor.py JSON output here once
a day. We persist the latest snapshot per host at
<health_root>/<host_id>.json. Not gated by version_gate
this is metadata about the host itself, not training data,
and we want sick hosts to be ABLE to report sickness even if
their code is out-of-date.
No size cap on the body beyond the global Starlette/uvicorn
max doctor JSON is small (<10 KiB)."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
if health_root is None:
return JSONResponse(
{"error": "health endpoint disabled (no health_root configured)"},
status_code=404,
)
host_id = request.path_params["host_id"]
if not is_valid_id(host_id):
return JSONResponse({"error": "bad host_id"}, status_code=400)
body_bytes = await request.body()
try:
body = json.loads(body_bytes)
except (ValueError, json.JSONDecodeError):
return JSONResponse({"error": "body must be JSON"}, status_code=400)
if not isinstance(body, dict):
return JSONResponse({"error": "body must be a JSON object"},
status_code=400)
# Stamp the receiver's view of when this arrived. Lab-host
# clocks are coarse so we don't trust them for "freshness."
record = {
"host_id": host_id,
"received_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ",
time.gmtime()),
"doctor": body,
}
health_root.mkdir(parents=True, exist_ok=True)
target = health_root / f"{host_id}.json"
# Write to a sibling tmp + atomic rename so a concurrent
# reader never sees a half-written file.
tmp = health_root / f".{host_id}.json.tmp"
tmp.write_text(json.dumps(record, indent=2))
tmp.replace(target)
return JSONResponse({"status": "stored", "host_id": host_id},
status_code=200)
async def get_fleet_health(request: Request) -> JSONResponse:
"""Aggregate view across all hosts that have reported. Used by
the maintainer / fleet-health monitor to spot sick hosts
without grepping per-host files."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
if health_root is None or not health_root.exists():
return JSONResponse({"hosts": []})
out: list[dict] = []
for f in sorted(health_root.glob("*.json")):
if f.name.startswith("."): # tmp files
continue
try:
out.append(json.loads(f.read_text()))
except (OSError, json.JSONDecodeError):
continue
return JSONResponse({"hosts": out})
routes = [ routes = [
Route("/v1/health", health, methods=["GET"]), Route("/v1/health", health, methods=["GET"]),
Route("/v1/ping", ping, methods=["POST"]), Route("/v1/ping", ping, methods=["POST"]),
@ -239,5 +310,15 @@ def make_app(
put_episode, put_episode,
methods=["PUT"], methods=["PUT"],
), ),
Route(
"/v1/host-health/{host_id}",
put_host_health,
methods=["PUT"],
),
Route(
"/v1/host-health",
get_fleet_health,
methods=["GET"],
),
] ]
return Starlette(routes=routes) return Starlette(routes=routes)

View file

@ -28,6 +28,11 @@ class ReceiverConfig:
version_gate_branch: str version_gate_branch: str
version_gate_auth_token: str | None version_gate_auth_token: str | None
version_gate_local_repo: Path | None version_gate_local_repo: Path | None
# Per-host doctor snapshots (PUT /v1/host-health/<host>). Disabled
# if unset — the receiver returns 404. Default points alongside
# the index in /var/lib/cis490 since that's the receiver's only
# persistent writable area under hardening.
health_root: Path | None
@classmethod @classmethod
def load(cls, path: str | Path) -> "ReceiverConfig": def load(cls, path: str | Path) -> "ReceiverConfig":
@ -56,4 +61,9 @@ class ReceiverConfig:
version_gate_branch=version_gate.get("branch", "main"), version_gate_branch=version_gate.get("branch", "main"),
version_gate_auth_token=version_gate.get("auth_token"), version_gate_auth_token=version_gate.get("auth_token"),
version_gate_local_repo=Path(local_repo).resolve() if local_repo else None, version_gate_local_repo=Path(local_repo).resolve() if local_repo else None,
health_root=(
Path(data["health_root"]).resolve()
if "health_root" in data
else Path("/var/lib/cis490/host-health").resolve()
),
) )

View file

@ -133,13 +133,20 @@ install -m 0644 "$REPO_ROOT/etc/cis490-cert-fetch.service" \
/etc/systemd/system/cis490-cert-fetch.service /etc/systemd/system/cis490-cert-fetch.service
install -m 0644 "$REPO_ROOT/etc/cis490-cert-fetch.timer" \ install -m 0644 "$REPO_ROOT/etc/cis490-cert-fetch.timer" \
/etc/systemd/system/cis490-cert-fetch.timer /etc/systemd/system/cis490-cert-fetch.timer
# Daily doctor check — runs cis490_doctor.py and PUTs JSON to the
# receiver's /v1/host-health/<host>. Lets the maintainer see fleet
# health without grepping per-host logs. See AGENTS.md "Fleet health".
install -m 0644 "$REPO_ROOT/etc/cis490-doctor-check.service" \
/etc/systemd/system/cis490-doctor-check.service
install -m 0644 "$REPO_ROOT/etc/cis490-doctor-check.timer" \
/etc/systemd/system/cis490-doctor-check.timer
systemctl daemon-reload systemctl daemon-reload
# Enable timers immediately — the operator gets self-healing on the # Enable timers immediately — the operator gets self-healing on the
# next tick without an extra `systemctl enable`. Idempotent. # next tick without an extra `systemctl enable`. Idempotent.
systemctl enable --now cis490-autoupdate.timer 2>/dev/null || \ for t in cis490-autoupdate.timer cis490-cert-fetch.timer cis490-doctor-check.timer; do
log "WARN: could not enable cis490-autoupdate.timer (will retry next install)" systemctl enable --now "$t" 2>/dev/null || \
systemctl enable --now cis490-cert-fetch.timer 2>/dev/null || \ log "WARN: could not enable $t (will retry next install)"
log "WARN: could not enable cis490-cert-fetch.timer (will retry next install)" done
# --- 5. config template (only on first install) ----------------------- # --- 5. config template (only on first install) -----------------------
if [[ ! -f "$ETC_ROOT/lab-host.toml" ]]; then if [[ ! -f "$ETC_ROOT/lab-host.toml" ]]; then

302
tests/test_fleet_health.py Normal file
View file

@ -0,0 +1,302 @@
"""Tests for tools/check_fleet_health.py.
The detector is the "did the fleet break and we didn't notice?" eye
on the receiver. Bugs here are silent failures of monitoring bad.
We exercise the three detectable symptoms (silent, fatal-only,
unstamped) and the dedup behaviour (sustained problem alert spam).
"""
from __future__ import annotations
import importlib.util
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"check_fleet_health", REPO_ROOT / "tools" / "check_fleet_health.py"
)
fh = importlib.util.module_from_spec(spec)
sys.modules["check_fleet_health"] = fh
spec.loader.exec_module(fh)
def _iso(epoch: float) -> str:
return datetime.fromtimestamp(epoch, tz=timezone.utc).isoformat()
def _write_index(path: Path, rows: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
for r in rows:
f.write(json.dumps(r) + "\n")
# ---------------------------------------------------------------------------
# scan_index
# ---------------------------------------------------------------------------
def test_scan_index_picks_latest_per_host(tmp_path: Path) -> None:
now = time.time()
idx = tmp_path / "index.jsonl"
_write_index(idx, [
{"host_id": "lab1", "received_at_wall": _iso(now - 3600)},
{"host_id": "lab1", "received_at_wall": _iso(now - 1800)},
{"host_id": "lab2", "received_at_wall": _iso(now - 60)},
])
hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now)
assert hosts["lab1"].last_seen_at_wall_iso == _iso(now - 1800)
assert hosts["lab2"].last_seen_at_wall_iso == _iso(now - 60)
assert hosts["lab1"].seen_in_lookback is True
assert hosts["lab2"].seen_in_lookback is True
def test_scan_index_marks_old_hosts_outside_lookback(tmp_path: Path) -> None:
"""A host last seen 48h ago shouldn't trigger silent-detection
just because it's been quiet — it might be a decommissioned lab
host. seen_in_lookback gates the silent alert."""
now = time.time()
idx = tmp_path / "index.jsonl"
_write_index(idx, [
{"host_id": "decom-host", "received_at_wall": _iso(now - 48 * 3600)},
])
hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=now)
assert hosts["decom-host"].seen_in_lookback is False
def test_scan_index_handles_malformed_rows(tmp_path: Path) -> None:
"""Real index.jsonl has occasional partial lines from prune
operations. Don't crash on them."""
idx = tmp_path / "index.jsonl"
idx.write_text(
'{"host_id": "lab1", "received_at_wall": "2026-05-01T00:00:00+00:00"}\n'
'not-json\n'
'{"host_id": "lab1"}\n' # missing received_at_wall
'{"received_at_wall": "2026-05-01T00:00:00+00:00"}\n' # missing host_id
'{"host_id": "lab2", "received_at_wall": "bogus-timestamp"}\n'
'{"host_id": "lab3", "received_at_wall": "2026-05-01T00:00:01+00:00"}\n'
)
hosts = fh.scan_index(idx, lookback_hours=24, now_epoch=time.time())
# lab1 + lab3 parsed successfully; lab2's bad timestamp is dropped
assert "lab1" in hosts
assert "lab3" in hosts
def test_scan_index_missing_file_returns_empty(tmp_path: Path) -> None:
"""Fresh receiver, index hasn't been created yet."""
hosts = fh.scan_index(tmp_path / "no-such-index.jsonl",
lookback_hours=24, now_epoch=time.time())
assert hosts == {}
# ---------------------------------------------------------------------------
# detect_alerts
# ---------------------------------------------------------------------------
def test_detect_silent_fires_for_active_host_gone_quiet() -> None:
now = time.time()
hosts = {
"lab1": fh.HostState(
last_seen_at_wall_iso=_iso(now - 45 * 60), # 45 min ago
seen_in_lookback=True,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert any(a.symptom == "silent" and a.host == "lab1" for a in alerts)
def test_detect_silent_does_not_fire_below_threshold() -> None:
now = time.time()
hosts = {
"lab1": fh.HostState(
last_seen_at_wall_iso=_iso(now - 10 * 60), # 10 min ago
seen_in_lookback=True,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert not any(a.symptom == "silent" for a in alerts)
def test_detect_silent_skips_inactive_hosts() -> None:
"""Don't bug the operator about a host they haven't seen in days."""
now = time.time()
hosts = {
"old-decom": fh.HostState(
last_seen_at_wall_iso=_iso(now - 5 * 86400),
seen_in_lookback=False,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert not any(a.symptom == "silent" for a in alerts)
def test_detect_fatal_only_fires_when_ratio_exceeds_and_no_2xx() -> None:
now = time.time()
hosts = {
"lab1": fh.HostState(
last_seen_at_wall_iso=_iso(now - 5 * 60),
seen_in_lookback=True,
recent_2xx=0, recent_4xx=200, recent_5xx=0,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert any(a.symptom == "fatal-only" and a.host == "lab1" for a in alerts)
def test_detect_fatal_only_quiet_when_some_2xx_landing() -> None:
"""A host with mixed 2xx+4xx is making progress — not stuck."""
now = time.time()
hosts = {
"lab1": fh.HostState(
last_seen_at_wall_iso=_iso(now - 5 * 60),
seen_in_lookback=True,
recent_2xx=20, recent_4xx=80,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert not any(a.symptom == "fatal-only" for a in alerts)
def test_detect_fatal_only_quiet_below_min_count() -> None:
"""Don't fire on 2 PUTs both 4xx — could be a normal startup blip."""
now = time.time()
hosts = {
"lab1": fh.HostState(
last_seen_at_wall_iso=_iso(now - 5 * 60),
seen_in_lookback=True,
recent_4xx=2,
),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
assert not any(a.symptom == "fatal-only" for a in alerts)
def test_detect_unstamped_fires_above_min_count() -> None:
now = time.time()
hosts = {
"lab1": fh.HostState(recent_unstamped_400=50),
}
alerts = fh.detect_alerts(
hosts, silent_threshold_min=30,
fatal_ratio_threshold=0.95, fatal_min_count=10,
now_epoch=now,
)
a = next(x for x in alerts if x.symptom == "unstamped")
assert "pre-stamp" in a.detail
assert "install-lab-host.sh" in a.suggested_fix
# ---------------------------------------------------------------------------
# emit_alerts (dedup behaviour)
# ---------------------------------------------------------------------------
def test_emit_alerts_writes_jsonl_and_dedups_within_hour(tmp_path: Path) -> None:
"""A sustained problem fires ONCE per hour — not every 5-min tick."""
alerts_path = tmp_path / "alerts.jsonl"
a = fh.Alert(
host="lab1", symptom="silent",
detail="last shipped 45 min ago",
suggested_fix="check status",
detected_at_wall=_iso(time.time()),
)
# First pass: alert is new, gets written.
n1 = fh.emit_alerts(alerts_path, [a])
assert n1 == 1
assert alerts_path.exists()
rows = [json.loads(l) for l in alerts_path.read_text().splitlines()]
assert len(rows) == 1
assert rows[0]["host"] == "lab1"
assert rows[0]["symptom"] == "silent"
# Second pass with the same alert: dedup kicks in, file unchanged.
n2 = fh.emit_alerts(alerts_path, [a])
assert n2 == 0
rows = [json.loads(l) for l in alerts_path.read_text().splitlines()]
assert len(rows) == 1
def test_emit_alerts_distinguishes_symptoms(tmp_path: Path) -> None:
"""Same host with two DIFFERENT symptoms shouldn't dedup against
each other."""
alerts_path = tmp_path / "alerts.jsonl"
now_iso = _iso(time.time())
a1 = fh.Alert(host="lab1", symptom="silent", detail="d1",
suggested_fix="f1", detected_at_wall=now_iso)
a2 = fh.Alert(host="lab1", symptom="fatal-only", detail="d2",
suggested_fix="f2", detected_at_wall=now_iso)
n = fh.emit_alerts(alerts_path, [a1, a2])
assert n == 2
def test_emit_alerts_idempotent_when_no_alerts(tmp_path: Path) -> None:
alerts_path = tmp_path / "alerts.jsonl"
n = fh.emit_alerts(alerts_path, [])
assert n == 0
assert not alerts_path.exists()
# ---------------------------------------------------------------------------
# main() smoke
# ---------------------------------------------------------------------------
def test_main_returns_zero_when_no_alerts(tmp_path: Path,
monkeypatch: pytest.MonkeyPatch) -> None:
"""End-to-end smoke. journalctl is stubbed (the runner is unlikely
to have systemd-journal access in test env)."""
idx = tmp_path / "index.jsonl"
now = time.time()
_write_index(idx, [
{"host_id": "lab1", "received_at_wall": _iso(now - 60)},
])
alerts = tmp_path / "alerts.jsonl"
real_run = fh.subprocess.run
def fake_run(cmd, *args, **kwargs):
if cmd and cmd[0] == "journalctl":
class R:
returncode = 0
stdout = ""
stderr = ""
return R()
return real_run(cmd, *args, **kwargs)
monkeypatch.setattr(fh.subprocess, "run", fake_run)
rc = fh.main(["--index-path", str(idx),
"--alerts-path", str(alerts)])
assert rc == 0

198
tests/test_host_health.py Normal file
View file

@ -0,0 +1,198 @@
"""Tests for the receiver's /v1/host-health endpoints.
PUT /v1/host-health/<host> store the lab host's daily doctor JSON
GET /v1/host-health aggregate snapshot across all hosts
The endpoint is deliberately NOT gated by version_gate sick hosts
running stale code still need to be able to report sickness.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from starlette.testclient import TestClient
from receiver.app import make_app
from receiver.store import EpisodeStore
@pytest.fixture
def app(tmp_path: Path):
store = EpisodeStore(
store_root=tmp_path / "store",
incoming_root=tmp_path / "incoming",
index_path=tmp_path / "index.jsonl",
)
return make_app(
store=store, max_episode_bytes=10_000_000,
bearer_token=None,
health_root=tmp_path / "host-health",
)
@pytest.fixture
def app_with_bearer(tmp_path: Path):
store = EpisodeStore(
store_root=tmp_path / "store",
incoming_root=tmp_path / "incoming",
index_path=tmp_path / "index.jsonl",
)
return make_app(
store=store, max_episode_bytes=10_000_000,
bearer_token="s3cret",
health_root=tmp_path / "host-health",
)
@pytest.fixture
def app_no_health(tmp_path: Path):
"""health_root=None ⇒ endpoint returns 404."""
store = EpisodeStore(
store_root=tmp_path / "store",
incoming_root=tmp_path / "incoming",
index_path=tmp_path / "index.jsonl",
)
return make_app(
store=store, max_episode_bytes=10_000_000,
bearer_token=None,
health_root=None,
)
def test_put_health_stores_doctor_json(app, tmp_path: Path) -> None:
snapshot = {
"role": "lab-host",
"checks": [
{"name": "install: VERSION stamp", "status": "ok", "detail": "main@abc123"},
{"name": "shipper: recent ship results", "status": "fail",
"detail": "12 412s in last 10 min"},
],
}
with TestClient(app) as c:
r = c.put("/v1/host-health/lab1", json=snapshot)
assert r.status_code == 200
assert r.json()["status"] == "stored"
assert r.json()["host_id"] == "lab1"
# File written
target = tmp_path / "host-health" / "lab1.json"
assert target.exists()
body = json.loads(target.read_text())
assert body["host_id"] == "lab1"
assert body["doctor"] == snapshot
assert "received_at_wall" in body
def test_put_health_overwrites_previous_snapshot(app, tmp_path: Path) -> None:
"""The endpoint stores the LATEST snapshot per host, not history."""
with TestClient(app) as c:
r1 = c.put("/v1/host-health/lab1", json={"checks": [{"v": 1}]})
r2 = c.put("/v1/host-health/lab1", json={"checks": [{"v": 2}]})
assert r1.status_code == 200 and r2.status_code == 200
body = json.loads((tmp_path / "host-health" / "lab1.json").read_text())
assert body["doctor"] == {"checks": [{"v": 2}]}
def test_put_health_rejects_invalid_host_id(app) -> None:
"""is_valid_id rejects path-traversal-y strings + extreme lengths.
URL-routing-level traversal (../) is normalised by the test
client before reaching us, so probe with a chunk of bad chars
that survive routing."""
with TestClient(app) as c:
r = c.put("/v1/host-health/lab%20with%20space", json={"checks": []})
assert r.status_code == 400
def test_put_health_rejects_non_json_body(app) -> None:
with TestClient(app) as c:
r = c.put("/v1/host-health/lab1", content=b"not json",
headers={"Content-Type": "application/json"})
assert r.status_code == 400
def test_put_health_rejects_array_body(app) -> None:
"""Body must be an OBJECT, not a list — the doctor's --json output
is always {role, checks: [...]}, never bare list."""
with TestClient(app) as c:
r = c.put("/v1/host-health/lab1", json=["x", "y"])
assert r.status_code == 400
def test_put_health_404_when_disabled(app_no_health) -> None:
"""Receivers without health_root configured return 404 — lets a
deployment opt out without removing the routes."""
with TestClient(app_no_health) as c:
r = c.put("/v1/host-health/lab1", json={"checks": []})
assert r.status_code == 404
def test_put_health_not_gated_by_version_gate(tmp_path: Path) -> None:
"""A sick host with stale code MUST still be able to report
sickness. Confirm we don't check X-Cis490-Code-Commit on the
health endpoint."""
store = EpisodeStore(
store_root=tmp_path / "store",
incoming_root=tmp_path / "incoming",
index_path=tmp_path / "index.jsonl",
)
# Build a gate that rejects everything to prove we don't run it.
class _RejectAll:
def check(self, commit): return False, "not-in-window"
def head(self): return None
def valid_count(self): return 0
app = make_app(
store=store, max_episode_bytes=10_000_000,
bearer_token=None,
version_gate=_RejectAll(),
health_root=tmp_path / "host-health",
)
with TestClient(app) as c:
r = c.put("/v1/host-health/lab1", json={"checks": []})
assert r.status_code == 200
def test_put_health_requires_bearer_when_configured(app_with_bearer) -> None:
with TestClient(app_with_bearer) as c:
r = c.put("/v1/host-health/lab1", json={"checks": []})
assert r.status_code == 401
r2 = c.put("/v1/host-health/lab1", json={"checks": []},
headers={"Authorization": "Bearer s3cret"})
assert r2.status_code == 200
def test_get_fleet_health_returns_all_snapshots(app, tmp_path: Path) -> None:
with TestClient(app) as c:
c.put("/v1/host-health/lab1", json={"checks": [{"v": 1}]})
c.put("/v1/host-health/lab2", json={"checks": [{"v": 2}]})
r = c.get("/v1/host-health")
assert r.status_code == 200
body = r.json()
hosts = {h["host_id"]: h for h in body["hosts"]}
assert hosts["lab1"]["doctor"] == {"checks": [{"v": 1}]}
assert hosts["lab2"]["doctor"] == {"checks": [{"v": 2}]}
def test_get_fleet_health_empty_when_no_reports(app) -> None:
with TestClient(app) as c:
r = c.get("/v1/host-health")
assert r.status_code == 200
assert r.json() == {"hosts": []}
def test_get_fleet_health_ignores_temp_files(app, tmp_path: Path) -> None:
"""Atomic-write tmpfiles (.lab1.json.tmp) shouldn't show up in the
aggregate listing if a write was in-flight."""
health_dir = tmp_path / "host-health"
health_dir.mkdir()
(health_dir / ".lab1.json.tmp").write_text('{"host_id": "lab1"}')
(health_dir / "lab2.json").write_text(
'{"host_id": "lab2", "doctor": {"checks": []}}'
)
with TestClient(app) as c:
r = c.get("/v1/host-health")
body = r.json()
hosts = [h.get("host_id") for h in body["hosts"]]
assert hosts == ["lab2"]

362
tools/check_fleet_health.py Normal file
View file

@ -0,0 +1,362 @@
"""Receiver-side fleet health check.
Runs every 5 minutes (via cis490-fleet-health.timer). Detects:
silent
A host that successfully shipped in the last `lookback_hours`
has been silent for more than `silent_threshold_min`. Likely a
crashed daemon, frozen on-device agent, or pulled-and-bricked
host stuck mid-install. The threshold is high enough to ignore
routine reboots but tight enough to catch a stalled fleet.
fatal-only
A host has been actively PUTting in the last
`recent_window_min` minutes but every PUT was 4xx. Signals
diverged-HEAD, missing VERSION stamp, or other gate-rejection
loop the lab host can't fix without operator action.
unstamped
A host is shipping with NO X-Cis490-Code-Commit header at all
pre-stamp code is still running. cis490-autoupdate.timer should
catch this within 30 min, so any persistent unstamped flag means
autoupdate is failing too.
Output:
/var/lib/cis490/alerts.jsonl one row per detected alert, dedup'd
on (host, symptom, hour-bucket) so
a sustained problem doesn't spam the
log every 5 min.
syslog (WARNING) easy to grep / forward to a notifier.
Exit codes:
0 no alerts
1 one or more alerts emitted
2 bad config / can't read inputs
Run by hand:
sudo /opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py \\
--index-path /var/lib/cis490/index.jsonl \\
--alerts-path /var/lib/cis490/alerts.jsonl
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import re
import subprocess
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger("cis490.fleet-health")
@dataclass
class Alert:
host: str
symptom: str # "silent" | "fatal-only" | "unstamped"
detail: str
detected_at_wall: str
suggested_fix: str
def dedup_key(self) -> str:
# 1-hour bucket keeps a sustained fault from spamming the log
# every tick, but lets us re-fire if it persists into the next
# hour (so the operator gets a fresh signal once per hour).
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
h = hashlib.sha256(f"{self.host}|{self.symptom}|{bucket}".encode()).hexdigest()
return h[:16]
def to_row(self) -> dict:
return {
"host": self.host,
"symptom": self.symptom,
"detail": self.detail,
"suggested_fix": self.suggested_fix,
"detected_at_wall": self.detected_at_wall,
"dedup_key": self.dedup_key(),
}
@dataclass
class HostState:
last_seen_at_wall_iso: str | None = None
seen_in_lookback: bool = False
recent_2xx: int = 0
recent_4xx: int = 0
recent_5xx: int = 0
recent_unstamped_400: int = 0
def _parse_iso(s: str) -> float:
"""ISO-8601 with timezone → epoch seconds."""
# received_at_wall format: "2026-05-01T06:37:53.830599+00:00"
return datetime.fromisoformat(s).timestamp()
def scan_index(index_path: Path, *, lookback_hours: int,
now_epoch: float) -> dict[str, HostState]:
"""Walk index.jsonl, compute per-host last-seen timestamp.
Streams the file (don't load the whole thing — index can be big).
Tail end is the most recent, so iterating linearly + keeping the
max per host is fine."""
cutoff = now_epoch - (lookback_hours * 3600)
hosts: dict[str, HostState] = defaultdict(HostState)
if not index_path.exists():
return {}
with index_path.open() as f:
for line in f:
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
host = row.get("host_id")
ts_iso = row.get("received_at_wall")
if not host or not ts_iso:
continue
try:
ts = _parse_iso(ts_iso)
except ValueError:
continue
st = hosts[host]
if (st.last_seen_at_wall_iso is None
or ts > _parse_iso(st.last_seen_at_wall_iso)):
st.last_seen_at_wall_iso = ts_iso
if ts >= cutoff:
st.seen_in_lookback = True
return dict(hosts)
def scan_journal(*, recent_window_min: int,
hosts: dict[str, HostState]) -> None:
"""Run journalctl over cis490-receiver and bucket per-host status
codes for the recent window. Mutates hosts in place."""
rc = subprocess.run(
["journalctl", "-u", "cis490-receiver",
"--since", f"{recent_window_min} minutes ago",
"--no-pager", "--output=cat"],
capture_output=True, text=True, timeout=30,
)
if rc.returncode != 0:
log.warning("journalctl returned %d; skipping fatal-only detection",
rc.returncode)
return
# Match the access log format uvicorn emits:
# "PUT /v1/episodes/<host>/<id>.tar.zst HTTP/1.1" <status>
put_re = re.compile(
r'PUT /v1/episodes/([^/]+)/[^"]+ HTTP/1\.1" (\d{3})'
)
# The receiver's own WARNING line for "missing X-Cis490-Code-Commit"
# — we count those separately so we can distinguish unstamped from
# other 4xx.
missing_re = re.compile(r'missing X-Cis490-Code-Commit', re.IGNORECASE)
last_was_missing_for: str | None = None
for line in rc.stdout.splitlines():
if missing_re.search(line):
# The next access-log PUT line (a few lines later) is the
# one this WARNING refers to. Mark a flag.
# Simplification: if we see "rejected episode host=X",
# extract the host directly.
m = re.search(r'host=([^\s]+)', line)
if m:
host = m.group(1).rstrip(',')
hosts.setdefault(host, HostState()).recent_unstamped_400 += 1
last_was_missing_for = host
continue
m = put_re.search(line)
if not m:
continue
host = m.group(1)
st = hosts.setdefault(host, HostState())
status = int(m.group(2))
if 200 <= status < 300:
st.recent_2xx += 1
elif 400 <= status < 500:
st.recent_4xx += 1
elif 500 <= status < 600:
st.recent_5xx += 1
def detect_alerts(
hosts: dict[str, HostState],
*,
silent_threshold_min: int,
fatal_ratio_threshold: float,
fatal_min_count: int,
now_epoch: float,
) -> list[Alert]:
alerts: list[Alert] = []
detected_at = datetime.fromtimestamp(now_epoch, tz=timezone.utc).isoformat()
for host, st in hosts.items():
# silent: shipped before, hasn't shipped recently
if st.seen_in_lookback and st.last_seen_at_wall_iso is not None:
last_seen = _parse_iso(st.last_seen_at_wall_iso)
silent_min = (now_epoch - last_seen) / 60
if silent_min > silent_threshold_min:
alerts.append(Alert(
host=host,
symptom="silent",
detail=(f"last successful ship {silent_min:.0f} min ago "
f"({st.last_seen_at_wall_iso})"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: check `systemctl status cis490-shipper "
f"cis490-orchestrator`. If services are inactive, "
f"`sudo systemctl start cis490-shipper "
f"cis490-orchestrator`. If actively failing, see "
f"FIXYOURSELF.md."
),
))
# unstamped: pre-stamp episodes still being shipped
if st.recent_unstamped_400 >= fatal_min_count:
alerts.append(Alert(
host=host,
symptom="unstamped",
detail=(f"{st.recent_unstamped_400} ship(s) without "
f"X-Cis490-Code-Commit header in recent window — "
f"orchestrator is running pre-stamp code"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: `sudo /opt/cis490/scripts/install-lab-host.sh` "
f"to re-stamp VERSION + restart orchestrator. If "
f"cis490-autoupdate.timer is enabled this should heal "
f"on its own within 30 min."
),
))
# fatal-only: actively shipping but >threshold% rejected
total = st.recent_2xx + st.recent_4xx + st.recent_5xx
if total >= fatal_min_count:
fatal_ratio = (st.recent_4xx + st.recent_5xx) / total
if fatal_ratio >= fatal_ratio_threshold and st.recent_2xx == 0:
alerts.append(Alert(
host=host,
symptom="fatal-only",
detail=(f"{total} ship(s) in recent window, "
f"{st.recent_4xx} × 4xx + {st.recent_5xx} × 5xx, "
f"0 × 2xx — host is shipping but everything "
f"is being rejected"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: check `journalctl -u cis490-shipper "
f"--since '5 minutes ago'`. If 412 commit-rejected: "
f"see FIXYOURSELF.md §B (likely diverged HEAD)."
),
))
return alerts
def load_existing_dedup_keys(alerts_path: Path) -> set[str]:
"""The current hour's already-emitted alerts. We re-read the file
each pass rather than maintaining a separate state file keeps
truth in one place and survives restarts."""
if not alerts_path.exists():
return set()
keys: set[str] = set()
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
try:
with alerts_path.open() as f:
for line in f:
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
# Only consider keys from THIS hour-bucket — older
# alerts shouldn't suppress a fresh detection.
detected = row.get("detected_at_wall", "")
if detected.startswith(bucket):
if isinstance(row.get("dedup_key"), str):
keys.add(row["dedup_key"])
except OSError:
pass
return keys
def emit_alerts(alerts_path: Path, alerts: list[Alert]) -> int:
if not alerts:
return 0
seen = load_existing_dedup_keys(alerts_path)
new_count = 0
alerts_path.parent.mkdir(parents=True, exist_ok=True)
with alerts_path.open("a") as f:
for a in alerts:
if a.dedup_key() in seen:
continue
f.write(json.dumps(a.to_row()) + "\n")
log.warning(
"ALERT host=%s symptom=%s%s. Suggested: %s",
a.host, a.symptom, a.detail, a.suggested_fix,
)
new_count += 1
return new_count
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-fleet-health",
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--index-path", default="/var/lib/cis490/index.jsonl",
type=Path)
p.add_argument("--alerts-path", default="/var/lib/cis490/alerts.jsonl",
type=Path)
p.add_argument("--lookback-hours", type=int, default=24,
help="A host that shipped in this window counts as 'active'; "
"silent-detection only fires for active hosts.")
p.add_argument("--silent-threshold-min", type=int, default=30,
help="An active host silent longer than this triggers 'silent'.")
p.add_argument("--recent-window-min", type=int, default=30,
help="Window for fatal-only / unstamped detection from the journal.")
p.add_argument("--fatal-ratio-threshold", type=float, default=0.95,
help="Min fatal-fraction in the recent window to alert.")
p.add_argument("--fatal-min-count", type=int, default=10,
help="Don't alert until this many recent PUTs — kills false "
"positives on hosts with very low traffic.")
p.add_argument("--log-level", default="INFO")
args = p.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
if not args.index_path.exists():
log.error("index missing: %s", args.index_path)
return 2
now_epoch = time.time()
hosts = scan_index(args.index_path,
lookback_hours=args.lookback_hours,
now_epoch=now_epoch)
scan_journal(recent_window_min=args.recent_window_min, hosts=hosts)
alerts = detect_alerts(
hosts,
silent_threshold_min=args.silent_threshold_min,
fatal_ratio_threshold=args.fatal_ratio_threshold,
fatal_min_count=args.fatal_min_count,
now_epoch=now_epoch,
)
new_count = emit_alerts(args.alerts_path, alerts)
if new_count == 0:
log.info("fleet healthy — %d hosts checked, no new alerts",
len(hosts))
return 1 if new_count else 0
if __name__ == "__main__":
sys.exit(main())

114
tools/ship_health_check.py Normal file
View file

@ -0,0 +1,114 @@
"""Run cis490_doctor.py and PUT the JSON output to the receiver.
Triggered by cis490-doctor-check.timer (once a day) or invoked by
hand. Best-effort: a doctor that exits with red rows still ships its
output that's the most useful case.
Reuses the shipper's transport (mTLS + bearer + receiver URL from
lab-host.toml) so we don't reimplement auth.
Failure modes:
- doctor crashes exit 2, log error
- PUT fails (non-2xx) exit 1, log error (timer fires next day)
- PUT succeeds exit 0
- mTLS not yet on disk exit 0 (silent first-boot path)
"""
from __future__ import annotations
import json
import logging
import subprocess
import sys
from pathlib import Path
import httpx
from shipper.config import ShipperConfig
from shipper.transport import ShipperTransport, _build_ssl_context, _CertNotReadyError
log = logging.getLogger("cis490.shipper.health-check")
def run_doctor(doctor_path: Path, role: str = "lab-host") -> dict:
"""Run cis490_doctor.py --json --role lab-host. Returns the parsed
JSON (which always has a `checks` array even when reds are
present, the doctor exits non-zero but still prints the report).
Raises RuntimeError if the doctor crashed without printing JSON."""
venv_py = Path("/opt/cis490/.venv/bin/python")
py = str(venv_py) if venv_py.exists() else sys.executable
rc = subprocess.run(
[py, str(doctor_path), "--role", role, "--json"],
capture_output=True, text=True, timeout=120,
)
# Doctor exits non-zero when red rows are present — that's
# exactly when we MOST want to ship the snapshot. Don't gate on
# exit code; gate on whether parseable JSON came out.
try:
return json.loads(rc.stdout)
except json.JSONDecodeError as e:
raise RuntimeError(
f"doctor produced no JSON (exit={rc.returncode}, "
f"stderr={rc.stderr[:500]!r})"
) from e
def ship_health(cfg: ShipperConfig, snapshot: dict) -> tuple[int, str]:
"""PUT snapshot to /v1/host-health/<host_id>. Reuses the shipper's
SSL context build so we get mTLS + the cert-not-ready deferral
behaviour for free."""
try:
verify = _build_ssl_context(cfg.receiver)
except _CertNotReadyError as e:
log.info("mTLS material not on disk yet; skipping health ship: %s", e)
return 0, "deferred"
url = f"{cfg.receiver.url}/v1/host-health/{cfg.host_id}"
headers = {"X-Lab-Host": cfg.host_id, "Content-Type": "application/json"}
if cfg.receiver.bearer_token:
headers["Authorization"] = f"Bearer {cfg.receiver.bearer_token}"
try:
with httpx.Client(verify=verify, timeout=cfg.request_timeout_s) as c:
r = c.put(url, headers=headers, content=json.dumps(snapshot))
except httpx.HTTPError as e:
return 1, f"HTTP error: {e}"
if 200 <= r.status_code < 300:
return 0, f"ok ({r.status_code})"
return 1, f"non-2xx: {r.status_code} {r.text[:200]}"
def main(argv: list[str] | None = None) -> int:
import argparse
p = argparse.ArgumentParser(prog="cis490-ship-health-check")
p.add_argument("--config", default="/etc/cis490/lab-host.toml")
p.add_argument("--doctor",
default="/opt/cis490/tools/cis490_doctor.py", type=Path)
p.add_argument("--log-level", default="INFO")
args = p.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
try:
cfg = ShipperConfig.load(args.config)
except (FileNotFoundError, ValueError) as e:
log.error("config error: %s", e)
return 2
try:
snapshot = run_doctor(args.doctor)
except (RuntimeError, subprocess.TimeoutExpired, FileNotFoundError) as e:
log.error("doctor failed: %s", e)
return 2
rc, msg = ship_health(cfg, snapshot)
log.info("health ship: %s", msg)
return rc
if __name__ == "__main__":
sys.exit(main())