From f9b2e5c4e65e8fe648be16a803c6207163961c47 Mon Sep 17 00:00:00 2001 From: max Date: Fri, 1 May 2026 12:02:59 -0500 Subject: [PATCH] shipper: systemd watchdog, quarantine cleanup; doctor surfaces ship errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three robustness items off the future-work list: 1. Shipper sd_notify watchdog. Type=notify + WatchdogSec=180. The daemon sends READY=1 after queue construction and WATCHDOG=1 once per scan pass via a heartbeat callback wired into run_forever. Restart=on-failure only catches process death — silent stalls (deadlock, hung tar subprocess, blocked I/O past timeout) used to leave a zombie running with the data backlog growing. Now systemd kills + restarts the daemon if no WATCHDOG=1 arrives within 180s. Verified end-to-end against systemd via `systemd-run --transient --property=Type=notify --property=WatchdogSec=10`: unit transitions to active on READY=1; SIGSTOP'ing the process triggers `Watchdog timeout (limit 10s)! Killing process N with SIGABRT` at exactly t+10s, then unit goes failed → restart cycle. 2. Quarantine cleanup. Without an upper bound, data/quarantine/ grew forever as fatal episodes piled up. New ShipperConfig fields: quarantine_keep_days = 30 # opt-out: 0 disables quarantine_cleanup_interval_s = 3600 # gate so 5s tick doesn't # statx() the whole tree Cleanup runs at the start of run_once() but is gated to once per hour. Removed entries logged. 3. Doctor surfaces shipping errors. Tails 10 minutes of cis490-shipper journal and surfaces 412/400/transient patterns as red/yellow rows with the canonical fix command. An on-device agent running cis490_doctor.py now sees one line ("12 ship(s) rejected as out-of-window") instead of needing to grep the journal. Tests: 200/200 (was 188). New coverage: heartbeat callback fires + survives exceptions; quarantine cleanup respects keep_days, gate, and opt-out; doctor parser correctly classifies 412/400/transient/clean/ empty/journalctl-denied; both error classes prioritise 412 (more actionable) when present together. Co-Authored-By: Claude Opus 4.7 (1M context) --- etc/cis490-shipper.service | 9 +- shipper/__main__.py | 37 +++++++- shipper/config.py | 10 ++ shipper/queue.py | 70 +++++++++++++- tests/test_doctor_shipping.py | 142 +++++++++++++++++++++++++++++ tests/test_shipper.py | 167 ++++++++++++++++++++++++++++++++++ tools/cis490_doctor.py | 95 +++++++++++++++++++ 7 files changed, 527 insertions(+), 3 deletions(-) create mode 100644 tests/test_doctor_shipping.py diff --git a/etc/cis490-shipper.service b/etc/cis490-shipper.service index 8175100..541fdaf 100644 --- a/etc/cis490-shipper.service +++ b/etc/cis490-shipper.service @@ -7,7 +7,14 @@ Wants=network-online.target Requires=wg-quick@wg0.service [Service] -Type=simple +# Type=notify so systemd waits for sd_notify("READY=1") before +# considering the unit started, and so WatchdogSec= can kick in. +# Without this, Restart=on-failure only catches process crashes — +# silent stalls (deadlock, blocked I/O past timeout, hung tar +# subprocess) leave a zombie running with the data backlog growing. +Type=notify +NotifyAccess=main +WatchdogSec=180 User=cis490 Group=cis490 WorkingDirectory=/opt/cis490 diff --git a/shipper/__main__.py b/shipper/__main__.py index 1051488..6c442f6 100644 --- a/shipper/__main__.py +++ b/shipper/__main__.py @@ -13,7 +13,9 @@ from __future__ import annotations import argparse import json import logging +import os import signal +import socket import sys from pathlib import Path @@ -22,6 +24,30 @@ from .queue import ShipperQueue from .transport import ShipperTransport +def _sd_notify(msg: str) -> None: + """Send ``msg`` to systemd's notify socket. No-op when running + outside systemd (NOTIFY_SOCKET unset) so the same binary works + fine under `--once`, manual invocation, or tests. + + See sd_notify(3). The protocol is one-line key=value messages + over an AF_UNIX SOCK_DGRAM socket. We don't need the libsystemd + dep — talking to the socket directly is stdlib.""" + sock_path = os.environ.get("NOTIFY_SOCKET") + if not sock_path: + return + if sock_path.startswith("@"): + # Abstract socket: prepend NUL and strip the leading '@'. + sock_path = "\0" + sock_path[1:] + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as s: + s.sendto(msg.encode("ascii"), sock_path) + except OSError: + # Failing to notify isn't fatal — at worst systemd's + # WatchdogSec fires and we get restarted, which is the + # behaviour the watchdog exists to provide. + pass + + def _setup_logging(level: str) -> None: logging.basicConfig( level=getattr(logging, level.upper(), logging.INFO), @@ -98,7 +124,16 @@ def main(argv: list[str] | None = None) -> int: "shipper starting: host_id=%s data_root=%s receiver=%s", cfg.host_id, cfg.data_root, cfg.receiver.url, ) - queue.run_forever(stop_check=lambda: stopping) + # Tell systemd we're ready to take work — gates Type=notify in + # the unit file. The systemd unit's WatchdogSec= will then expect + # WATCHDOG=1 messages at least every seconds; a + # missed one means stalled-mid-loop and triggers a kill+restart. + _sd_notify("READY=1") + queue.run_forever( + stop_check=lambda: stopping, + heartbeat=lambda: _sd_notify("WATCHDOG=1"), + ) + _sd_notify("STOPPING=1") return 0 diff --git a/shipper/config.py b/shipper/config.py index 7a302ec..784c9b6 100644 --- a/shipper/config.py +++ b/shipper/config.py @@ -33,6 +33,15 @@ class ShipperConfig: backoff_seconds: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 120.0, 300.0) # Local retention before pruning data/shipped/. keep_local_for_days: int = 7 + # Quarantine retention. Episodes that the receiver permanently + # rejected (400/412) sit here as evidence; without an upper bound + # they grow forever. Set to 0 to disable cleanup (operator + # responsibility). + quarantine_keep_days: int = 30 + # How often the quarantine cleanup pass actually runs. Gated + # because a 5-second scan tick checking mtimes against a + # 30-day-old cutoff is wasteful — once an hour is plenty. + quarantine_cleanup_interval_s: float = 3600.0 @property def episodes_dir(self) -> Path: @@ -88,6 +97,7 @@ class ShipperConfig: scan_interval_s=float(data.get("shipper", {}).get("scan_interval_s", 5.0)), request_timeout_s=float(data.get("shipper", {}).get("request_timeout_s", 60.0)), keep_local_for_days=int(retention.get("keep_local_for_days", 7)), + quarantine_keep_days=int(retention.get("quarantine_keep_days", 30)), ) diff --git a/shipper/queue.py b/shipper/queue.py index 473d94f..2a92647 100644 --- a/shipper/queue.py +++ b/shipper/queue.py @@ -65,6 +65,10 @@ class ShipperQueue: cfg.outbox_dir.mkdir(parents=True, exist_ok=True) cfg.shipped_dir.mkdir(parents=True, exist_ok=True) cfg.quarantine_dir.mkdir(parents=True, exist_ok=True) + # Last wall-clock time we walked quarantine/ for cleanup. Set + # to 0.0 so the first pass always sweeps (covers daemon + # startup after a long downtime). + self._last_quarantine_cleanup_at: float = 0.0 # ---- main entry point --------------------------------------------- @@ -77,6 +81,10 @@ class ShipperQueue: # Bounded by the number of files in outbox/, so cheap to do # every pass. self._sweep_outbox() + # Drop quarantine entries older than keep_days — gated on a + # once-per-hour check so the 5-second scan tick doesn't + # statx() the whole quarantine tree on every pass. + self._maybe_cleanup_quarantine() ready = self._ready_episodes() scanned = len(ready) shipped = 0 @@ -138,12 +146,36 @@ class ShipperQueue: fatal=fatal, ) - def run_forever(self, *, stop_check=lambda: False) -> None: + def run_forever( + self, + *, + stop_check=lambda: False, + heartbeat=lambda: None, + ) -> None: + """Long-running scan loop. + + Args: + stop_check: returns True when the daemon should exit (SIGTERM + handler). Checked between passes and inside the inter-pass + sleep so SIGTERM isn't blocked for up to scan_interval_s. + heartbeat: invoked once per completed pass. Wired to + sd_notify("WATCHDOG=1") in production so systemd can + kill+restart the daemon if a pass hangs longer than the + unit's WatchdogSec — catches silent stalls (deadlock, + blocked I/O past timeout) that Restart=on-failure misses. + """ while not stop_check(): try: self.run_once() except Exception: log.exception("scan pass crashed; sleeping anyway") + try: + heartbeat() + except Exception: + # A heartbeat failure mustn't take down the daemon — + # if the watchdog wire is broken, we want at least the + # ship loop to keep running. + log.exception("heartbeat callback failed") # Coarse sleep: we don't need precise scheduling and we # don't want a tight loop on errors. t0 = time.monotonic() @@ -163,6 +195,42 @@ class ShipperQueue: out.append(ep) return out + def _maybe_cleanup_quarantine(self) -> None: + """Walk quarantine/ and remove episodes older than keep_days. + + Cheap on a daemon that's been running a while because the + once-per-hour gate prevents the scan tick from statx()-ing + the whole tree every 5s. On the first pass after startup, the + gate's 0.0 sentinel means we always sweep — that catches a + daemon that was offline through a backlog accumulation.""" + keep_days = self.cfg.quarantine_keep_days + if keep_days <= 0: + return + now = time.time() + if (now - self._last_quarantine_cleanup_at + < self.cfg.quarantine_cleanup_interval_s): + return + self._last_quarantine_cleanup_at = now + cutoff = now - (keep_days * 86400) + removed = 0 + if not self.cfg.quarantine_dir.exists(): + return + for ep in self.cfg.quarantine_dir.iterdir(): + if not ep.is_dir(): + continue + try: + # Use mtime — quarantine dirs are written once + # (the rename + the reason file), so mtime tracks + # quarantine age, not the original episode age. + if ep.stat().st_mtime < cutoff: + shutil.rmtree(ep, ignore_errors=True) + removed += 1 + except OSError: + log.exception("quarantine cleanup failed for %s", ep.name) + if removed: + log.info("quarantine cleanup: removed %d episode(s) older than %d days", + removed, keep_days) + def _sweep_outbox(self) -> None: """Delete tarballs in outbox/ that have no matching episode dir. diff --git a/tests/test_doctor_shipping.py b/tests/test_doctor_shipping.py new file mode 100644 index 0000000..dac7398 --- /dev/null +++ b/tests/test_doctor_shipping.py @@ -0,0 +1,142 @@ +"""Unit tests for cis490_doctor's recent-shipping-errors parser. + +The parser scans `journalctl -u cis490-shipper` output for the same +strings the receiver/shipper actually emit. We monkeypatch `_run` so +the test doesn't need a real systemd journal. +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parent.parent +spec = importlib.util.spec_from_file_location( + "cis490_doctor", REPO_ROOT / "tools" / "cis490_doctor.py" +) +doctor = importlib.util.module_from_spec(spec) +sys.modules["cis490_doctor"] = doctor +spec.loader.exec_module(doctor) + + +def _run_journal(monkeypatch, output: str, rc: int = 0) -> None: + """Stub `cis490_doctor._run` to return ``output`` for journalctl + calls, real subprocess for everything else.""" + real = doctor._run + + def fake(cmd): + if cmd and cmd[0] == "journalctl": + return rc, output, "" + return real(cmd) + monkeypatch.setattr(doctor, "_run", fake) + + +def _check_for(report: doctor.Report, name_substr: str) -> doctor.Check | None: + for c in report.checks: + if name_substr in c.name: + return c + return None + + +def test_412_pattern_surfaces_as_fail(monkeypatch: pytest.MonkeyPatch) -> None: + """5 minutes of 412 commit-rejected logs is the exact symptom we + saw on k-gamingcom for 5+ hours pre-fix. Doctor must point the + operator at the canonical pull+reinstall path.""" + log = "\n".join( + f"ship 01EP{i:02d} -> 412 commit-rejected. your_commit=abc..." + for i in range(20) + ) + _run_journal(monkeypatch, log) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "fail" + assert "out-of-window" in c.detail + assert "git pull origin main" in c.fix + assert "install-lab-host.sh" in c.fix + + +def test_400_missing_commit_pattern(monkeypatch: pytest.MonkeyPatch) -> None: + log = "\n".join( + f'episode 01EP{i:02d} -> fatal (400) "missing X-Cis490-Code-Commit header"' + for i in range(5) + ) + _run_journal(monkeypatch, log) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "fail" + assert "missing-commit-header" in c.detail + assert "install-lab-host.sh" in c.fix + + +def test_clean_journal_is_ok(monkeypatch: pytest.MonkeyPatch) -> None: + log = "\n".join( + f"ship 01EP{i:02d} -> stored (201) " for i in range(10) + ) + _run_journal(monkeypatch, log) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "ok" + + +def test_empty_journal_is_ok_idle(monkeypatch: pytest.MonkeyPatch) -> None: + """No log output ≠ broken — could be a daemon that finished + everything and is idle.""" + _run_journal(monkeypatch, "") + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "ok" + assert "idle" in c.detail + + +def test_journalctl_unavailable_skips(monkeypatch: pytest.MonkeyPatch) -> None: + """Doctor often runs as the unprivileged user and reading the + system journal can need systemd-journal group membership. + journalctl returning non-zero (perm denied / not installed) + should produce a `skip` row, not noise.""" + _run_journal(monkeypatch, "", rc=1) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent log scan") + assert c is not None and c.status == "skip" + + +def test_transient_failures_warn_at_threshold( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A pile of transient (5xx/network) failures isn't fatal but + it's worth flagging — the receiver might be down or WG might be + flaking. The threshold (>5) is tuned to ignore an occasional + blip.""" + log = "\n".join( + f"ship 01EP{i:02d} -> transient: timeout" for i in range(10) + ) + _run_journal(monkeypatch, log) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "warn" + assert "transient" in c.detail + + +def test_412_takes_priority_over_400(monkeypatch: pytest.MonkeyPatch) -> None: + """If both error classes are present, 412 (out-of-date code) + is the more actionable diagnosis — fixing it fixes both, since + a re-install also writes VERSION.""" + log = ( + "ship 01OLD -> 412 commit-rejected\n" + 'ship 01OLD2 -> fatal (400) "missing X-Cis490-Code-Commit header"\n' + ) + _run_journal(monkeypatch, log) + report = doctor.Report(role="lab-host") + doctor.check_recent_shipping_errors(report) + c = _check_for(report, "recent ship results") + assert c is not None and c.status == "fail" + assert "out-of-window" in c.detail diff --git a/tests/test_shipper.py b/tests/test_shipper.py index 29e0035..7fb1e72 100644 --- a/tests/test_shipper.py +++ b/tests/test_shipper.py @@ -320,6 +320,173 @@ def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> Non assert (cfg.outbox_dir / "01DOWN.tar.zst").exists() +def test_quarantine_cleanup_removes_old_entries(tmp_path: Path) -> None: + """Without an upper bound, quarantine/ grows forever. The cleanup + pass walks it once per cleanup_interval and drops anything past + keep_days — bounded by directory size since it just statx()s + each entry's mtime. + + We run with cleanup_interval_s=0 so the gate fires on every pass, + and overload `os.utime` to age a fixture entry past the cutoff + without sleeping for real time.""" + import os as _os + import time as _time + + cfg = ShipperConfig( + host_id="lab1", + data_root=tmp_path / "lab-data", + receiver=ReceiverEndpoint(url="http://127.0.0.1:1"), + scan_interval_s=0.05, + quarantine_keep_days=7, + quarantine_cleanup_interval_s=0.0, # always run on every pass + ) + + class _NoTransport: + def ship_tarball(self, *a, **kw): raise AssertionError("not used") + + queue = ShipperQueue(cfg, _NoTransport()) + + old = cfg.quarantine_dir / "01OLD" + old.mkdir() + (old / "meta.json").write_text("{}") + new = cfg.quarantine_dir / "01NEW" + new.mkdir() + (new / "meta.json").write_text("{}") + + # Backdate the OLD entry by 8 days. The directory's own mtime + # is what cleanup checks. + eight_days_ago = _time.time() - (8 * 86400) + _os.utime(old, (eight_days_ago, eight_days_ago)) + + queue._maybe_cleanup_quarantine() + + assert not old.exists(), "8-day-old entry should be cleaned up" + assert new.exists(), "fresh entry should survive" + + +def test_quarantine_cleanup_disabled_when_keep_days_zero(tmp_path: Path) -> None: + import os as _os + import time as _time + + cfg = ShipperConfig( + host_id="lab1", + data_root=tmp_path / "lab-data", + receiver=ReceiverEndpoint(url="http://127.0.0.1:1"), + scan_interval_s=0.05, + quarantine_keep_days=0, # disabled + quarantine_cleanup_interval_s=0.0, + ) + + class _NoTransport: + def ship_tarball(self, *a, **kw): raise AssertionError("not used") + + queue = ShipperQueue(cfg, _NoTransport()) + + old = cfg.quarantine_dir / "01OLD" + old.mkdir() + eight_days_ago = _time.time() - (8 * 86400) + _os.utime(old, (eight_days_ago, eight_days_ago)) + + queue._maybe_cleanup_quarantine() + assert old.exists(), "cleanup must be a no-op when keep_days=0" + + +def test_quarantine_cleanup_respects_interval_gate(tmp_path: Path) -> None: + """The interval gate prevents the 5s scan tick from statx()-ing + the whole quarantine tree on every pass.""" + import os as _os + import time as _time + + cfg = ShipperConfig( + host_id="lab1", + data_root=tmp_path / "lab-data", + receiver=ReceiverEndpoint(url="http://127.0.0.1:1"), + scan_interval_s=0.05, + quarantine_keep_days=7, + quarantine_cleanup_interval_s=3600.0, + ) + + class _NoTransport: + def ship_tarball(self, *a, **kw): raise AssertionError("not used") + + queue = ShipperQueue(cfg, _NoTransport()) + + # First pass: gate's 0.0 sentinel means we sweep. + queue._maybe_cleanup_quarantine() + first_at = queue._last_quarantine_cleanup_at + assert first_at > 0 + + # Stage an old entry AFTER the first sweep. The gate should + # block the next sweep until cleanup_interval_s has elapsed. + old = cfg.quarantine_dir / "01OLD" + old.mkdir() + _os.utime(old, (_time.time() - 8 * 86400,) * 2) + + queue._maybe_cleanup_quarantine() + assert old.exists(), "gate should defer the next sweep" + assert queue._last_quarantine_cleanup_at == first_at + + +def test_run_forever_calls_heartbeat(tmp_path: Path) -> None: + """The heartbeat callback fires once per completed pass. In + production this is wired to sd_notify(WATCHDOG=1) so systemd's + WatchdogSec catches a hung scan loop.""" + import threading + + cfg = ShipperConfig( + host_id="lab1", + data_root=tmp_path / "lab-data", + receiver=ReceiverEndpoint(url="http://127.0.0.1:1"), + scan_interval_s=0.05, + ) + + class _NoTransport: + def ship_tarball(self, *a, **kw): raise AssertionError("not used") + + queue = ShipperQueue(cfg, _NoTransport()) + + beats = [] + stop = threading.Event() + + def _heartbeat() -> None: + beats.append(time.monotonic()) + if len(beats) >= 3: + stop.set() + + queue.run_forever(stop_check=stop.is_set, heartbeat=_heartbeat) + assert len(beats) >= 3 + + +def test_run_forever_survives_heartbeat_exception(tmp_path: Path) -> None: + """A broken heartbeat (e.g. NOTIFY_SOCKET vanished) must not take + down the daemon — the loss of watchdog is tolerable; the loss + of the ship loop is not.""" + cfg = ShipperConfig( + host_id="lab1", + data_root=tmp_path / "lab-data", + receiver=ReceiverEndpoint(url="http://127.0.0.1:1"), + scan_interval_s=0.05, + ) + + class _NoTransport: + def ship_tarball(self, *a, **kw): raise AssertionError("not used") + + queue = ShipperQueue(cfg, _NoTransport()) + + pass_count = [0] + + def _stop() -> bool: + return pass_count[0] >= 3 + + def _broken_heartbeat() -> None: + pass_count[0] += 1 + raise RuntimeError("simulated NOTIFY_SOCKET failure") + + # Should NOT raise. + queue.run_forever(stop_check=_stop, heartbeat=_broken_heartbeat) + assert pass_count[0] >= 3 + + def test_run_once_sweeps_orphaned_outbox_tarball(tmp_path: Path, receiver) -> None: """A tarball in outbox/ with no matching episode dir should get cleaned up at the start of the next scan. The lifecycle invariant diff --git a/tools/cis490_doctor.py b/tools/cis490_doctor.py index 7121a74..b6e8cf3 100644 --- a/tools/cis490_doctor.py +++ b/tools/cis490_doctor.py @@ -354,6 +354,100 @@ def check_services(report: Report, role: str) -> None: )) +# --------------------------------------------------------------------------- +# checks — recent shipping errors (lab-host) +# --------------------------------------------------------------------------- + + +def check_recent_shipping_errors(report: Report) -> None: + """Tail the last 10 minutes of cis490-shipper logs and surface + any 400/412 patterns. The shipper logs every PUT outcome, so a + fresh stream of fatals means the lab host's code is older than + the receiver's allow-list — exactly the loop our gate-cutover + fixes were meant to prevent. Surfacing here gives the operator + a one-line "what's broken" instead of having to grep the journal. + + Skipped silently if journalctl isn't accessible (doctor often + runs as the unprivileged user and reading the system journal + needs the systemd-journal group).""" + rc, out, err = _run([ + "journalctl", "-u", "cis490-shipper", + "--since", "10 minutes ago", "--no-pager", "--output=cat", + ]) + if rc != 0: + # Permission denied / journalctl not available / unit not + # installed yet — none of these merit a red row. + report.add(Check( + "shipper: recent log scan", + "skip", + detail=(err.strip().splitlines()[-1] if err else "no output")[:80], + )) + return + + lines = out.splitlines() + if not lines: + report.add(Check( + "shipper: recent ship results", + "ok", + detail="no output in last 10 minutes (daemon may be idle)", + )) + return + + # Match what queue.py / app.py actually log. We're conservative: + # only count lines that explicitly identify a ship outcome so + # we don't false-positive on unrelated 400s the receiver might + # log (e.g. health-check probes). + fatal_400 = sum(1 for ln in lines if "missing X-Cis490-Code-Commit" in ln) + fatal_412 = sum(1 for ln in lines if "412 commit-rejected" in ln + or "code commit rejected" in ln) + other_fatal = sum(1 for ln in lines + if "ship " in ln and "fatal" in ln + and "missing X-Cis490-Code-Commit" not in ln + and "commit rejected" not in ln) + transient = sum(1 for ln in lines + if "ship " in ln and "transient" in ln) + + if fatal_412 > 0: + report.add(Check( + "shipper: recent ship results", + "fail", + detail=f"{fatal_412} ship(s) rejected as out-of-window in last 10 min", + fix=("cd /opt/cis490 && sudo -u cis490 git pull origin main && " + "sudo /opt/cis490/scripts/install-lab-host.sh " + "# pulls new code + drains stale queue + restarts daemon"), + )) + elif fatal_400 > 0: + report.add(Check( + "shipper: recent ship results", + "fail", + detail=( + f"{fatal_400} ship(s) rejected as missing-commit-header — " + "orchestrator is emitting episodes without code_version" + ), + fix=("sudo /opt/cis490/scripts/install-lab-host.sh " + "# rewrites VERSION + restarts orchestrator"), + )) + elif other_fatal > 0: + report.add(Check( + "shipper: recent ship results", + "warn", + detail=f"{other_fatal} fatal ship(s) in last 10 min (other 4xx)", + fix="sudo journalctl -u cis490-shipper --since '10 minutes ago' " + "| grep -E 'ship .*fatal'", + )) + elif transient > 5: + report.add(Check( + "shipper: recent ship results", + "warn", + detail=f"{transient} transient failures in last 10 min — receiver reachable?", + fix="sudo /opt/cis490/.venv/bin/python -m shipper " + "--config /etc/cis490/lab-host.toml --ping", + )) + else: + # At least one line of output, but no error patterns matched. + report.add(Check("shipper: recent ship results", "ok")) + + # --------------------------------------------------------------------------- # checks — network (lab-host) # --------------------------------------------------------------------------- @@ -647,6 +741,7 @@ def main(argv: list[str] | None = None) -> int: check_bridge(report) if not args.no_tier3: check_tier3(report) + check_recent_shipping_errors(report) check_end_to_end(report) summary = report.summary()