"""Receiver-side fleet health check. Runs every 5 minutes (via cis490-fleet-health.timer). Detects: silent A host that successfully shipped in the last `lookback_hours` has been silent for more than `silent_threshold_min`. Likely a crashed daemon, frozen on-device agent, or pulled-and-bricked host stuck mid-install. The threshold is high enough to ignore routine reboots but tight enough to catch a stalled fleet. fatal-only A host has been actively PUTting in the last `recent_window_min` minutes but every PUT was 4xx. Signals diverged-HEAD, missing VERSION stamp, or other gate-rejection loop the lab host can't fix without operator action. unstamped A host is shipping with NO X-Cis490-Code-Commit header at all — pre-stamp code is still running. cis490-autoupdate.timer should catch this within 30 min, so any persistent unstamped flag means autoupdate is failing too. Output: /var/lib/cis490/alerts.jsonl — one row per detected alert, dedup'd on (host, symptom, hour-bucket) so a sustained problem doesn't spam the log every 5 min. syslog (WARNING) — easy to grep / forward to a notifier. Exit codes: 0 no alerts 1 one or more alerts emitted 2 bad config / can't read inputs Run by hand: sudo /opt/cis490/.venv/bin/python /opt/cis490/tools/check_fleet_health.py \\ --index-path /var/lib/cis490/index.jsonl \\ --alerts-path /var/lib/cis490/alerts.jsonl """ from __future__ import annotations import argparse import hashlib import json import logging import os import re import subprocess import sys import time from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path log = logging.getLogger("cis490.fleet-health") @dataclass class Alert: host: str symptom: str # "silent" | "fatal-only" | "unstamped" detail: str detected_at_wall: str suggested_fix: str def dedup_key(self) -> str: # 1-hour bucket keeps a sustained fault from spamming the log # every tick, but lets us re-fire if it persists into the next # hour (so the operator gets a fresh signal once per hour). bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H") h = hashlib.sha256(f"{self.host}|{self.symptom}|{bucket}".encode()).hexdigest() return h[:16] def to_row(self) -> dict: return { "host": self.host, "symptom": self.symptom, "detail": self.detail, "suggested_fix": self.suggested_fix, "detected_at_wall": self.detected_at_wall, "dedup_key": self.dedup_key(), } @dataclass class HostState: last_seen_at_wall_iso: str | None = None seen_in_lookback: bool = False recent_2xx: int = 0 recent_4xx: int = 0 recent_5xx: int = 0 recent_unstamped_400: int = 0 def _parse_iso(s: str) -> float: """ISO-8601 with timezone → epoch seconds.""" # received_at_wall format: "2026-05-01T06:37:53.830599+00:00" return datetime.fromisoformat(s).timestamp() def scan_index(index_path: Path, *, lookback_hours: int, now_epoch: float) -> dict[str, HostState]: """Walk index.jsonl, compute per-host last-seen timestamp. Streams the file (don't load the whole thing — index can be big). Tail end is the most recent, so iterating linearly + keeping the max per host is fine.""" cutoff = now_epoch - (lookback_hours * 3600) hosts: dict[str, HostState] = defaultdict(HostState) if not index_path.exists(): return {} with index_path.open() as f: for line in f: try: row = json.loads(line) except json.JSONDecodeError: continue host = row.get("host_id") ts_iso = row.get("received_at_wall") if not host or not ts_iso: continue try: ts = _parse_iso(ts_iso) except ValueError: continue st = hosts[host] if (st.last_seen_at_wall_iso is None or ts > _parse_iso(st.last_seen_at_wall_iso)): st.last_seen_at_wall_iso = ts_iso if ts >= cutoff: st.seen_in_lookback = True return dict(hosts) def scan_journal(*, recent_window_min: int, hosts: dict[str, HostState]) -> None: """Run journalctl over cis490-receiver and bucket per-host status codes for the recent window. Mutates hosts in place.""" rc = subprocess.run( ["journalctl", "-u", "cis490-receiver", "--since", f"{recent_window_min} minutes ago", "--no-pager", "--output=cat"], capture_output=True, text=True, timeout=30, ) if rc.returncode != 0: log.warning("journalctl returned %d; skipping fatal-only detection", rc.returncode) return # Match the access log format uvicorn emits: # "PUT /v1/episodes//.tar.zst HTTP/1.1" put_re = re.compile( r'PUT /v1/episodes/([^/]+)/[^"]+ HTTP/1\.1" (\d{3})' ) # The receiver's own WARNING line for "missing X-Cis490-Code-Commit" # — we count those separately so we can distinguish unstamped from # other 4xx. missing_re = re.compile(r'missing X-Cis490-Code-Commit', re.IGNORECASE) last_was_missing_for: str | None = None for line in rc.stdout.splitlines(): if missing_re.search(line): # The next access-log PUT line (a few lines later) is the # one this WARNING refers to. Mark a flag. # Simplification: if we see "rejected episode host=X", # extract the host directly. m = re.search(r'host=([^\s]+)', line) if m: host = m.group(1).rstrip(',') hosts.setdefault(host, HostState()).recent_unstamped_400 += 1 last_was_missing_for = host continue m = put_re.search(line) if not m: continue host = m.group(1) st = hosts.setdefault(host, HostState()) status = int(m.group(2)) if 200 <= status < 300: st.recent_2xx += 1 elif 400 <= status < 500: st.recent_4xx += 1 elif 500 <= status < 600: st.recent_5xx += 1 def detect_alerts( hosts: dict[str, HostState], *, silent_threshold_min: int, fatal_ratio_threshold: float, fatal_min_count: int, now_epoch: float, ) -> list[Alert]: alerts: list[Alert] = [] detected_at = datetime.fromtimestamp(now_epoch, tz=timezone.utc).isoformat() for host, st in hosts.items(): # silent: shipped before, hasn't shipped recently if st.seen_in_lookback and st.last_seen_at_wall_iso is not None: last_seen = _parse_iso(st.last_seen_at_wall_iso) silent_min = (now_epoch - last_seen) / 60 if silent_min > silent_threshold_min: alerts.append(Alert( host=host, symptom="silent", detail=(f"last successful ship {silent_min:.0f} min ago " f"({st.last_seen_at_wall_iso})"), detected_at_wall=detected_at, suggested_fix=( f"On {host}: check `systemctl status cis490-shipper " f"cis490-orchestrator`. If services are inactive, " f"`sudo systemctl start cis490-shipper " f"cis490-orchestrator`. If actively failing, see " f"FIXYOURSELF.md." ), )) # unstamped: pre-stamp episodes still being shipped if st.recent_unstamped_400 >= fatal_min_count: alerts.append(Alert( host=host, symptom="unstamped", detail=(f"{st.recent_unstamped_400} ship(s) without " f"X-Cis490-Code-Commit header in recent window — " f"orchestrator is running pre-stamp code"), detected_at_wall=detected_at, suggested_fix=( f"On {host}: `sudo /opt/cis490/scripts/install-lab-host.sh` " f"to re-stamp VERSION + restart orchestrator. If " f"cis490-autoupdate.timer is enabled this should heal " f"on its own within 30 min." ), )) # fatal-only: actively shipping but >threshold% rejected total = st.recent_2xx + st.recent_4xx + st.recent_5xx if total >= fatal_min_count: fatal_ratio = (st.recent_4xx + st.recent_5xx) / total if fatal_ratio >= fatal_ratio_threshold and st.recent_2xx == 0: alerts.append(Alert( host=host, symptom="fatal-only", detail=(f"{total} ship(s) in recent window, " f"{st.recent_4xx} × 4xx + {st.recent_5xx} × 5xx, " f"0 × 2xx — host is shipping but everything " f"is being rejected"), detected_at_wall=detected_at, suggested_fix=( f"On {host}: check `journalctl -u cis490-shipper " f"--since '5 minutes ago'`. If 412 commit-rejected: " f"see FIXYOURSELF.md §B (likely diverged HEAD)." ), )) return alerts def load_existing_dedup_keys(alerts_path: Path) -> set[str]: """The current hour's already-emitted alerts. We re-read the file each pass rather than maintaining a separate state file — keeps truth in one place and survives restarts.""" if not alerts_path.exists(): return set() keys: set[str] = set() bucket = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H") try: with alerts_path.open() as f: for line in f: try: row = json.loads(line) except json.JSONDecodeError: continue # Only consider keys from THIS hour-bucket — older # alerts shouldn't suppress a fresh detection. detected = row.get("detected_at_wall", "") if detected.startswith(bucket): if isinstance(row.get("dedup_key"), str): keys.add(row["dedup_key"]) except OSError: pass return keys def emit_alerts(alerts_path: Path, alerts: list[Alert]) -> int: if not alerts: return 0 seen = load_existing_dedup_keys(alerts_path) new_count = 0 alerts_path.parent.mkdir(parents=True, exist_ok=True) with alerts_path.open("a") as f: for a in alerts: if a.dedup_key() in seen: continue f.write(json.dumps(a.to_row()) + "\n") log.warning( "ALERT host=%s symptom=%s — %s. Suggested: %s", a.host, a.symptom, a.detail, a.suggested_fix, ) new_count += 1 return new_count def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-fleet-health", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--index-path", default="/var/lib/cis490/index.jsonl", type=Path) p.add_argument("--alerts-path", default="/var/lib/cis490/alerts.jsonl", type=Path) p.add_argument("--lookback-hours", type=int, default=24, help="A host that shipped in this window counts as 'active'; " "silent-detection only fires for active hosts.") p.add_argument("--silent-threshold-min", type=int, default=30, help="An active host silent longer than this triggers 'silent'.") p.add_argument("--recent-window-min", type=int, default=30, help="Window for fatal-only / unstamped detection from the journal.") p.add_argument("--fatal-ratio-threshold", type=float, default=0.95, help="Min fatal-fraction in the recent window to alert.") p.add_argument("--fatal-min-count", type=int, default=10, help="Don't alert until this many recent PUTs — kills false " "positives on hosts with very low traffic.") p.add_argument("--log-level", default="INFO") args = p.parse_args(argv) logging.basicConfig( level=getattr(logging, args.log_level.upper(), logging.INFO), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) if not args.index_path.exists(): log.error("index missing: %s", args.index_path) return 2 now_epoch = time.time() hosts = scan_index(args.index_path, lookback_hours=args.lookback_hours, now_epoch=now_epoch) scan_journal(recent_window_min=args.recent_window_min, hosts=hosts) alerts = detect_alerts( hosts, silent_threshold_min=args.silent_threshold_min, fatal_ratio_threshold=args.fatal_ratio_threshold, fatal_min_count=args.fatal_min_count, now_epoch=now_epoch, ) new_count = emit_alerts(args.alerts_path, alerts) if new_count == 0: log.info("fleet healthy — %d hosts checked, no new alerts", len(hosts)) else: log.info("emitted %d new alert(s); see %s", new_count, args.alerts_path) # Exit 0 even when alerts are emitted: the alert IS the signal, # not the unit's success/failure state. systemd treating "alerts # found" as unit-failed is a UX wart — it makes `systemctl status` # always red on a healthy detector that's just watching a fault. # Operators consume alerts via journalctl + alerts.jsonl; the # unit's failure state should mean "the detector itself broke." return 0 if __name__ == "__main__": sys.exit(main())