The detector previously returned 1 on alerts, which made systemd mark cis490-fleet-health.service as 'failed' every tick that found a sick host. That's the wrong UX — a detector finding a fault is working correctly, not crashing. The alert is the signal (via WARNING log + alerts.jsonl); the unit's success state should mean "the detector itself ran cleanly." Test added. Caught while live-deploying on the Pi: the first run found elliott-thinkpad fatal-only at 943×4xx + 1425×5xx and correctly emitted the alert — but systemd showed the unit red, which would have caused operators to chase the wrong tail. Side note: the same first run also caught a real bug — pycache for receiver.store on /opt/cis490 was stale after I deployed the new app.py + store.py from main, causing 1464 × 500 responses. Cleared the pycache and the index immediately resumed growing (4465 → 4515 in 30 seconds). The detector earned its keep on the very first cycle. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
371 lines
14 KiB
Python
371 lines
14 KiB
Python
"""Receiver-side fleet health check.
|
||
|
||
Runs every 5 minutes (via cis490-fleet-health.timer). Detects:
|
||
|
||
silent
|
||
A host that successfully shipped in the last `lookback_hours`
|
||
has been silent for more than `silent_threshold_min`. Likely a
|
||
crashed daemon, frozen on-device agent, or pulled-and-bricked
|
||
host stuck mid-install. The threshold is high enough to ignore
|
||
routine reboots but tight enough to catch a stalled fleet.
|
||
|
||
fatal-only
|
||
A host has been actively PUTting in the last
|
||
`recent_window_min` minutes but every PUT was 4xx. Signals
|
||
diverged-HEAD, missing VERSION stamp, or other gate-rejection
|
||
loop the lab host can't fix without operator action.
|
||
|
||
unstamped
|
||
A host is shipping with NO X-Cis490-Code-Commit header at all —
|
||
pre-stamp code is still running. cis490-autoupdate.timer should
|
||
catch this within 30 min, so any persistent unstamped flag means
|
||
autoupdate is failing too.
|
||
|
||
Output:
|
||
/var/lib/cis490/alerts.jsonl — one row per detected alert, dedup'd
|
||
on (host, symptom, hour-bucket) so
|
||
a sustained problem doesn't spam the
|
||
log every 5 min.
|
||
syslog (WARNING) — easy to grep / forward to a notifier.
|
||
|
||
Exit codes:
|
||
0 no alerts
|
||
1 one or more alerts emitted
|
||
2 bad config / can't read inputs
|
||
|
||
Run by hand:
|
||
sudo /opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py \\
|
||
--index-path /var/lib/cis490/index.jsonl \\
|
||
--alerts-path /var/lib/cis490/alerts.jsonl
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
|
||
log = logging.getLogger("cis490.fleet-health")
|
||
|
||
|
||
@dataclass
|
||
class Alert:
|
||
host: str
|
||
symptom: str # "silent" | "fatal-only" | "unstamped"
|
||
detail: str
|
||
detected_at_wall: str
|
||
suggested_fix: str
|
||
|
||
def dedup_key(self) -> str:
|
||
# 1-hour bucket keeps a sustained fault from spamming the log
|
||
# every tick, but lets us re-fire if it persists into the next
|
||
# hour (so the operator gets a fresh signal once per hour).
|
||
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
|
||
h = hashlib.sha256(f"{self.host}|{self.symptom}|{bucket}".encode()).hexdigest()
|
||
return h[:16]
|
||
|
||
def to_row(self) -> dict:
|
||
return {
|
||
"host": self.host,
|
||
"symptom": self.symptom,
|
||
"detail": self.detail,
|
||
"suggested_fix": self.suggested_fix,
|
||
"detected_at_wall": self.detected_at_wall,
|
||
"dedup_key": self.dedup_key(),
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class HostState:
|
||
last_seen_at_wall_iso: str | None = None
|
||
seen_in_lookback: bool = False
|
||
recent_2xx: int = 0
|
||
recent_4xx: int = 0
|
||
recent_5xx: int = 0
|
||
recent_unstamped_400: int = 0
|
||
|
||
|
||
def _parse_iso(s: str) -> float:
|
||
"""ISO-8601 with timezone → epoch seconds."""
|
||
# received_at_wall format: "2026-05-01T06:37:53.830599+00:00"
|
||
return datetime.fromisoformat(s).timestamp()
|
||
|
||
|
||
def scan_index(index_path: Path, *, lookback_hours: int,
|
||
now_epoch: float) -> dict[str, HostState]:
|
||
"""Walk index.jsonl, compute per-host last-seen timestamp.
|
||
|
||
Streams the file (don't load the whole thing — index can be big).
|
||
Tail end is the most recent, so iterating linearly + keeping the
|
||
max per host is fine."""
|
||
cutoff = now_epoch - (lookback_hours * 3600)
|
||
hosts: dict[str, HostState] = defaultdict(HostState)
|
||
if not index_path.exists():
|
||
return {}
|
||
with index_path.open() as f:
|
||
for line in f:
|
||
try:
|
||
row = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
host = row.get("host_id")
|
||
ts_iso = row.get("received_at_wall")
|
||
if not host or not ts_iso:
|
||
continue
|
||
try:
|
||
ts = _parse_iso(ts_iso)
|
||
except ValueError:
|
||
continue
|
||
st = hosts[host]
|
||
if (st.last_seen_at_wall_iso is None
|
||
or ts > _parse_iso(st.last_seen_at_wall_iso)):
|
||
st.last_seen_at_wall_iso = ts_iso
|
||
if ts >= cutoff:
|
||
st.seen_in_lookback = True
|
||
return dict(hosts)
|
||
|
||
|
||
def scan_journal(*, recent_window_min: int,
|
||
hosts: dict[str, HostState]) -> None:
|
||
"""Run journalctl over cis490-receiver and bucket per-host status
|
||
codes for the recent window. Mutates hosts in place."""
|
||
rc = subprocess.run(
|
||
["journalctl", "-u", "cis490-receiver",
|
||
"--since", f"{recent_window_min} minutes ago",
|
||
"--no-pager", "--output=cat"],
|
||
capture_output=True, text=True, timeout=30,
|
||
)
|
||
if rc.returncode != 0:
|
||
log.warning("journalctl returned %d; skipping fatal-only detection",
|
||
rc.returncode)
|
||
return
|
||
|
||
# Match the access log format uvicorn emits:
|
||
# "PUT /v1/episodes/<host>/<id>.tar.zst HTTP/1.1" <status>
|
||
put_re = re.compile(
|
||
r'PUT /v1/episodes/([^/]+)/[^"]+ HTTP/1\.1" (\d{3})'
|
||
)
|
||
# The receiver's own WARNING line for "missing X-Cis490-Code-Commit"
|
||
# — we count those separately so we can distinguish unstamped from
|
||
# other 4xx.
|
||
missing_re = re.compile(r'missing X-Cis490-Code-Commit', re.IGNORECASE)
|
||
last_was_missing_for: str | None = None
|
||
|
||
for line in rc.stdout.splitlines():
|
||
if missing_re.search(line):
|
||
# The next access-log PUT line (a few lines later) is the
|
||
# one this WARNING refers to. Mark a flag.
|
||
# Simplification: if we see "rejected episode host=X",
|
||
# extract the host directly.
|
||
m = re.search(r'host=([^\s]+)', line)
|
||
if m:
|
||
host = m.group(1).rstrip(',')
|
||
hosts.setdefault(host, HostState()).recent_unstamped_400 += 1
|
||
last_was_missing_for = host
|
||
continue
|
||
|
||
m = put_re.search(line)
|
||
if not m:
|
||
continue
|
||
host = m.group(1)
|
||
st = hosts.setdefault(host, HostState())
|
||
status = int(m.group(2))
|
||
if 200 <= status < 300:
|
||
st.recent_2xx += 1
|
||
elif 400 <= status < 500:
|
||
st.recent_4xx += 1
|
||
elif 500 <= status < 600:
|
||
st.recent_5xx += 1
|
||
|
||
|
||
def detect_alerts(
|
||
hosts: dict[str, HostState],
|
||
*,
|
||
silent_threshold_min: int,
|
||
fatal_ratio_threshold: float,
|
||
fatal_min_count: int,
|
||
now_epoch: float,
|
||
) -> list[Alert]:
|
||
alerts: list[Alert] = []
|
||
detected_at = datetime.fromtimestamp(now_epoch, tz=timezone.utc).isoformat()
|
||
|
||
for host, st in hosts.items():
|
||
# silent: shipped before, hasn't shipped recently
|
||
if st.seen_in_lookback and st.last_seen_at_wall_iso is not None:
|
||
last_seen = _parse_iso(st.last_seen_at_wall_iso)
|
||
silent_min = (now_epoch - last_seen) / 60
|
||
if silent_min > silent_threshold_min:
|
||
alerts.append(Alert(
|
||
host=host,
|
||
symptom="silent",
|
||
detail=(f"last successful ship {silent_min:.0f} min ago "
|
||
f"({st.last_seen_at_wall_iso})"),
|
||
detected_at_wall=detected_at,
|
||
suggested_fix=(
|
||
f"On {host}: check `systemctl status cis490-shipper "
|
||
f"cis490-orchestrator`. If services are inactive, "
|
||
f"`sudo systemctl start cis490-shipper "
|
||
f"cis490-orchestrator`. If actively failing, see "
|
||
f"FIXYOURSELF.md."
|
||
),
|
||
))
|
||
|
||
# unstamped: pre-stamp episodes still being shipped
|
||
if st.recent_unstamped_400 >= fatal_min_count:
|
||
alerts.append(Alert(
|
||
host=host,
|
||
symptom="unstamped",
|
||
detail=(f"{st.recent_unstamped_400} ship(s) without "
|
||
f"X-Cis490-Code-Commit header in recent window — "
|
||
f"orchestrator is running pre-stamp code"),
|
||
detected_at_wall=detected_at,
|
||
suggested_fix=(
|
||
f"On {host}: `sudo /opt/cis490/scripts/install-lab-host.sh` "
|
||
f"to re-stamp VERSION + restart orchestrator. If "
|
||
f"cis490-autoupdate.timer is enabled this should heal "
|
||
f"on its own within 30 min."
|
||
),
|
||
))
|
||
|
||
# fatal-only: actively shipping but >threshold% rejected
|
||
total = st.recent_2xx + st.recent_4xx + st.recent_5xx
|
||
if total >= fatal_min_count:
|
||
fatal_ratio = (st.recent_4xx + st.recent_5xx) / total
|
||
if fatal_ratio >= fatal_ratio_threshold and st.recent_2xx == 0:
|
||
alerts.append(Alert(
|
||
host=host,
|
||
symptom="fatal-only",
|
||
detail=(f"{total} ship(s) in recent window, "
|
||
f"{st.recent_4xx} × 4xx + {st.recent_5xx} × 5xx, "
|
||
f"0 × 2xx — host is shipping but everything "
|
||
f"is being rejected"),
|
||
detected_at_wall=detected_at,
|
||
suggested_fix=(
|
||
f"On {host}: check `journalctl -u cis490-shipper "
|
||
f"--since '5 minutes ago'`. If 412 commit-rejected: "
|
||
f"see FIXYOURSELF.md §B (likely diverged HEAD)."
|
||
),
|
||
))
|
||
|
||
return alerts
|
||
|
||
|
||
def load_existing_dedup_keys(alerts_path: Path) -> set[str]:
|
||
"""The current hour's already-emitted alerts. We re-read the file
|
||
each pass rather than maintaining a separate state file — keeps
|
||
truth in one place and survives restarts."""
|
||
if not alerts_path.exists():
|
||
return set()
|
||
keys: set[str] = set()
|
||
bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H")
|
||
try:
|
||
with alerts_path.open() as f:
|
||
for line in f:
|
||
try:
|
||
row = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
# Only consider keys from THIS hour-bucket — older
|
||
# alerts shouldn't suppress a fresh detection.
|
||
detected = row.get("detected_at_wall", "")
|
||
if detected.startswith(bucket):
|
||
if isinstance(row.get("dedup_key"), str):
|
||
keys.add(row["dedup_key"])
|
||
except OSError:
|
||
pass
|
||
return keys
|
||
|
||
|
||
def emit_alerts(alerts_path: Path, alerts: list[Alert]) -> int:
|
||
if not alerts:
|
||
return 0
|
||
seen = load_existing_dedup_keys(alerts_path)
|
||
new_count = 0
|
||
alerts_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with alerts_path.open("a") as f:
|
||
for a in alerts:
|
||
if a.dedup_key() in seen:
|
||
continue
|
||
f.write(json.dumps(a.to_row()) + "\n")
|
||
log.warning(
|
||
"ALERT host=%s symptom=%s — %s. Suggested: %s",
|
||
a.host, a.symptom, a.detail, a.suggested_fix,
|
||
)
|
||
new_count += 1
|
||
return new_count
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
p = argparse.ArgumentParser(prog="cis490-fleet-health",
|
||
description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
p.add_argument("--index-path", default="/var/lib/cis490/index.jsonl",
|
||
type=Path)
|
||
p.add_argument("--alerts-path", default="/var/lib/cis490/alerts.jsonl",
|
||
type=Path)
|
||
p.add_argument("--lookback-hours", type=int, default=24,
|
||
help="A host that shipped in this window counts as 'active'; "
|
||
"silent-detection only fires for active hosts.")
|
||
p.add_argument("--silent-threshold-min", type=int, default=30,
|
||
help="An active host silent longer than this triggers 'silent'.")
|
||
p.add_argument("--recent-window-min", type=int, default=30,
|
||
help="Window for fatal-only / unstamped detection from the journal.")
|
||
p.add_argument("--fatal-ratio-threshold", type=float, default=0.95,
|
||
help="Min fatal-fraction in the recent window to alert.")
|
||
p.add_argument("--fatal-min-count", type=int, default=10,
|
||
help="Don't alert until this many recent PUTs — kills false "
|
||
"positives on hosts with very low traffic.")
|
||
p.add_argument("--log-level", default="INFO")
|
||
args = p.parse_args(argv)
|
||
|
||
logging.basicConfig(
|
||
level=getattr(logging, args.log_level.upper(), logging.INFO),
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||
)
|
||
|
||
if not args.index_path.exists():
|
||
log.error("index missing: %s", args.index_path)
|
||
return 2
|
||
|
||
now_epoch = time.time()
|
||
hosts = scan_index(args.index_path,
|
||
lookback_hours=args.lookback_hours,
|
||
now_epoch=now_epoch)
|
||
scan_journal(recent_window_min=args.recent_window_min, hosts=hosts)
|
||
|
||
alerts = detect_alerts(
|
||
hosts,
|
||
silent_threshold_min=args.silent_threshold_min,
|
||
fatal_ratio_threshold=args.fatal_ratio_threshold,
|
||
fatal_min_count=args.fatal_min_count,
|
||
now_epoch=now_epoch,
|
||
)
|
||
new_count = emit_alerts(args.alerts_path, alerts)
|
||
if new_count == 0:
|
||
log.info("fleet healthy — %d hosts checked, no new alerts",
|
||
len(hosts))
|
||
else:
|
||
log.info("emitted %d new alert(s); see %s",
|
||
new_count, args.alerts_path)
|
||
# Exit 0 even when alerts are emitted: the alert IS the signal,
|
||
# not the unit's success/failure state. systemd treating "alerts
|
||
# found" as unit-failed is a UX wart — it makes `systemctl status`
|
||
# always red on a healthy detector that's just watching a fault.
|
||
# Operators consume alerts via journalctl + alerts.jsonl; the
|
||
# unit's failure state should mean "the detector itself broke."
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|