CIS490/tools/check_fleet_health.py
max 05bf785f0a fleet-health: exit 0 when alerts found (don't mark unit failed)
The detector previously returned 1 on alerts, which made systemd
mark cis490-fleet-health.service as 'failed' every tick that found
a sick host. That's the wrong UX — a detector finding a fault is
working correctly, not crashing. The alert is the signal (via
WARNING log + alerts.jsonl); the unit's success state should mean
"the detector itself ran cleanly." Test added.

Caught while live-deploying on the Pi: the first run found
elliott-thinkpad fatal-only at 943×4xx + 1425×5xx and correctly
emitted the alert — but systemd showed the unit red, which would
have caused operators to chase the wrong tail.

Side note: the same first run also caught a real bug — pycache for
receiver.store on /opt/cis490 was stale after I deployed the new
app.py + store.py from main, causing 1464 × 500 responses. Cleared
the pycache and the index immediately resumed growing (4465 →
4515 in 30 seconds). The detector earned its keep on the very
first cycle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 13:51:20 -05:00

371 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Receiver-side fleet health check.
Runs every 5 minutes (via cis490-fleet-health.timer). Detects:
silent
A host that successfully shipped in the last `lookback_hours`
has been silent for more than `silent_threshold_min`. Likely a
crashed daemon, frozen on-device agent, or pulled-and-bricked
host stuck mid-install. The threshold is high enough to ignore
routine reboots but tight enough to catch a stalled fleet.
fatal-only
A host has been actively PUTting in the last
`recent_window_min` minutes but every PUT was 4xx. Signals
diverged-HEAD, missing VERSION stamp, or other gate-rejection
loop the lab host can't fix without operator action.
unstamped
A host is shipping with NO X-Cis490-Code-Commit header at all —
pre-stamp code is still running. cis490-autoupdate.timer should
catch this within 30 min, so any persistent unstamped flag means
autoupdate is failing too.
Output:
/var/lib/cis490/alerts.jsonl — one row per detected alert, dedup'd
on (host, symptom, hour-bucket) so
a sustained problem doesn't spam the
log every 5 min.
syslog (WARNING) — easy to grep / forward to a notifier.
Exit codes:
0 no alerts
1 one or more alerts emitted
2 bad config / can't read inputs
Run by hand:
sudo /opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py \\
--index-path /var/lib/cis490/index.jsonl \\
--alerts-path /var/lib/cis490/alerts.jsonl
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import re
import subprocess
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger("cis490.fleet-health")
@dataclass
class Alert:
host: str
symptom: str # "silent" | "fatal-only" | "unstamped"
detail: str
detected_at_wall: str
suggested_fix: str
def dedup_key(self) -> str:
# 1-hour bucket keeps a sustained fault from spamming the log
# every tick, but lets us re-fire if it persists into the next
# hour (so the operator gets a fresh signal once per hour).
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
h = hashlib.sha256(f"{self.host}|{self.symptom}|{bucket}".encode()).hexdigest()
return h[:16]
def to_row(self) -> dict:
return {
"host": self.host,
"symptom": self.symptom,
"detail": self.detail,
"suggested_fix": self.suggested_fix,
"detected_at_wall": self.detected_at_wall,
"dedup_key": self.dedup_key(),
}
@dataclass
class HostState:
last_seen_at_wall_iso: str | None = None
seen_in_lookback: bool = False
recent_2xx: int = 0
recent_4xx: int = 0
recent_5xx: int = 0
recent_unstamped_400: int = 0
def _parse_iso(s: str) -> float:
"""ISO-8601 with timezone → epoch seconds."""
# received_at_wall format: "2026-05-01T06:37:53.830599+00:00"
return datetime.fromisoformat(s).timestamp()
def scan_index(index_path: Path, *, lookback_hours: int,
now_epoch: float) -> dict[str, HostState]:
"""Walk index.jsonl, compute per-host last-seen timestamp.
Streams the file (don't load the whole thing — index can be big).
Tail end is the most recent, so iterating linearly + keeping the
max per host is fine."""
cutoff = now_epoch - (lookback_hours * 3600)
hosts: dict[str, HostState] = defaultdict(HostState)
if not index_path.exists():
return {}
with index_path.open() as f:
for line in f:
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
host = row.get("host_id")
ts_iso = row.get("received_at_wall")
if not host or not ts_iso:
continue
try:
ts = _parse_iso(ts_iso)
except ValueError:
continue
st = hosts[host]
if (st.last_seen_at_wall_iso is None
or ts > _parse_iso(st.last_seen_at_wall_iso)):
st.last_seen_at_wall_iso = ts_iso
if ts >= cutoff:
st.seen_in_lookback = True
return dict(hosts)
def scan_journal(*, recent_window_min: int,
hosts: dict[str, HostState]) -> None:
"""Run journalctl over cis490-receiver and bucket per-host status
codes for the recent window. Mutates hosts in place."""
rc = subprocess.run(
["journalctl", "-u", "cis490-receiver",
"--since", f"{recent_window_min} minutes ago",
"--no-pager", "--output=cat"],
capture_output=True, text=True, timeout=30,
)
if rc.returncode != 0:
log.warning("journalctl returned %d; skipping fatal-only detection",
rc.returncode)
return
# Match the access log format uvicorn emits:
# "PUT /v1/episodes/<host>/<id>.tar.zst HTTP/1.1" <status>
put_re = re.compile(
r'PUT /v1/episodes/([^/]+)/[^"]+ HTTP/1\.1" (\d{3})'
)
# The receiver's own WARNING line for "missing X-Cis490-Code-Commit"
# — we count those separately so we can distinguish unstamped from
# other 4xx.
missing_re = re.compile(r'missing X-Cis490-Code-Commit', re.IGNORECASE)
last_was_missing_for: str | None = None
for line in rc.stdout.splitlines():
if missing_re.search(line):
# The next access-log PUT line (a few lines later) is the
# one this WARNING refers to. Mark a flag.
# Simplification: if we see "rejected episode host=X",
# extract the host directly.
m = re.search(r'host=([^\s]+)', line)
if m:
host = m.group(1).rstrip(',')
hosts.setdefault(host, HostState()).recent_unstamped_400 += 1
last_was_missing_for = host
continue
m = put_re.search(line)
if not m:
continue
host = m.group(1)
st = hosts.setdefault(host, HostState())
status = int(m.group(2))
if 200 <= status < 300:
st.recent_2xx += 1
elif 400 <= status < 500:
st.recent_4xx += 1
elif 500 <= status < 600:
st.recent_5xx += 1
def detect_alerts(
hosts: dict[str, HostState],
*,
silent_threshold_min: int,
fatal_ratio_threshold: float,
fatal_min_count: int,
now_epoch: float,
) -> list[Alert]:
alerts: list[Alert] = []
detected_at = datetime.fromtimestamp(now_epoch, tz=timezone.utc).isoformat()
for host, st in hosts.items():
# silent: shipped before, hasn't shipped recently
if st.seen_in_lookback and st.last_seen_at_wall_iso is not None:
last_seen = _parse_iso(st.last_seen_at_wall_iso)
silent_min = (now_epoch - last_seen) / 60
if silent_min > silent_threshold_min:
alerts.append(Alert(
host=host,
symptom="silent",
detail=(f"last successful ship {silent_min:.0f} min ago "
f"({st.last_seen_at_wall_iso})"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: check `systemctl status cis490-shipper "
f"cis490-orchestrator`. If services are inactive, "
f"`sudo systemctl start cis490-shipper "
f"cis490-orchestrator`. If actively failing, see "
f"FIXYOURSELF.md."
),
))
# unstamped: pre-stamp episodes still being shipped
if st.recent_unstamped_400 >= fatal_min_count:
alerts.append(Alert(
host=host,
symptom="unstamped",
detail=(f"{st.recent_unstamped_400} ship(s) without "
f"X-Cis490-Code-Commit header in recent window — "
f"orchestrator is running pre-stamp code"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: `sudo /opt/cis490/scripts/install-lab-host.sh` "
f"to re-stamp VERSION + restart orchestrator. If "
f"cis490-autoupdate.timer is enabled this should heal "
f"on its own within 30 min."
),
))
# fatal-only: actively shipping but >threshold% rejected
total = st.recent_2xx + st.recent_4xx + st.recent_5xx
if total >= fatal_min_count:
fatal_ratio = (st.recent_4xx + st.recent_5xx) / total
if fatal_ratio >= fatal_ratio_threshold and st.recent_2xx == 0:
alerts.append(Alert(
host=host,
symptom="fatal-only",
detail=(f"{total} ship(s) in recent window, "
f"{st.recent_4xx} × 4xx + {st.recent_5xx} × 5xx, "
f"0 × 2xx — host is shipping but everything "
f"is being rejected"),
detected_at_wall=detected_at,
suggested_fix=(
f"On {host}: check `journalctl -u cis490-shipper "
f"--since '5 minutes ago'`. If 412 commit-rejected: "
f"see FIXYOURSELF.md §B (likely diverged HEAD)."
),
))
return alerts
def load_existing_dedup_keys(alerts_path: Path) -> set[str]:
"""The current hour's already-emitted alerts. We re-read the file
each pass rather than maintaining a separate state file — keeps
truth in one place and survives restarts."""
if not alerts_path.exists():
return set()
keys: set[str] = set()
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
try:
with alerts_path.open() as f:
for line in f:
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
# Only consider keys from THIS hour-bucket — older
# alerts shouldn't suppress a fresh detection.
detected = row.get("detected_at_wall", "")
if detected.startswith(bucket):
if isinstance(row.get("dedup_key"), str):
keys.add(row["dedup_key"])
except OSError:
pass
return keys
def emit_alerts(alerts_path: Path, alerts: list[Alert]) -> int:
if not alerts:
return 0
seen = load_existing_dedup_keys(alerts_path)
new_count = 0
alerts_path.parent.mkdir(parents=True, exist_ok=True)
with alerts_path.open("a") as f:
for a in alerts:
if a.dedup_key() in seen:
continue
f.write(json.dumps(a.to_row()) + "\n")
log.warning(
"ALERT host=%s symptom=%s%s. Suggested: %s",
a.host, a.symptom, a.detail, a.suggested_fix,
)
new_count += 1
return new_count
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-fleet-health",
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--index-path", default="/var/lib/cis490/index.jsonl",
type=Path)
p.add_argument("--alerts-path", default="/var/lib/cis490/alerts.jsonl",
type=Path)
p.add_argument("--lookback-hours", type=int, default=24,
help="A host that shipped in this window counts as 'active'; "
"silent-detection only fires for active hosts.")
p.add_argument("--silent-threshold-min", type=int, default=30,
help="An active host silent longer than this triggers 'silent'.")
p.add_argument("--recent-window-min", type=int, default=30,
help="Window for fatal-only / unstamped detection from the journal.")
p.add_argument("--fatal-ratio-threshold", type=float, default=0.95,
help="Min fatal-fraction in the recent window to alert.")
p.add_argument("--fatal-min-count", type=int, default=10,
help="Don't alert until this many recent PUTs — kills false "
"positives on hosts with very low traffic.")
p.add_argument("--log-level", default="INFO")
args = p.parse_args(argv)
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
if not args.index_path.exists():
log.error("index missing: %s", args.index_path)
return 2
now_epoch = time.time()
hosts = scan_index(args.index_path,
lookback_hours=args.lookback_hours,
now_epoch=now_epoch)
scan_journal(recent_window_min=args.recent_window_min, hosts=hosts)
alerts = detect_alerts(
hosts,
silent_threshold_min=args.silent_threshold_min,
fatal_ratio_threshold=args.fatal_ratio_threshold,
fatal_min_count=args.fatal_min_count,
now_epoch=now_epoch,
)
new_count = emit_alerts(args.alerts_path, alerts)
if new_count == 0:
log.info("fleet healthy — %d hosts checked, no new alerts",
len(hosts))
else:
log.info("emitted %d new alert(s); see %s",
new_count, args.alerts_path)
# Exit 0 even when alerts are emitted: the alert IS the signal,
# not the unit's success/failure state. systemd treating "alerts
# found" as unit-failed is a UX wart — it makes `systemctl status`
# always red on a healthy detector that's just watching a fault.
# Operators consume alerts via journalctl + alerts.jsonl; the
# unit's failure state should mean "the detector itself broke."
return 0
if __name__ == "__main__":
sys.exit(main())