Merge remote-tracking branch 'origin/main' into Dev_REL1_043026

This commit is contained in:
Elliott Kolden 2026-05-02 12:24:08 -06:00
commit 86bd9e21d7
21 changed files with 1734 additions and 81 deletions

192
AGENTS.md
View file

@ -5,17 +5,48 @@ The conventions below also apply to sibling repos (`wg-enroll`,
`wg-pki`, `caddy`, `iptmonads`, `matrix`, `forgejo`, `vault`,
`openclaw-deploy`).
> **READ THIS FIRST IF YOU'RE TROUBLESHOOTING A LAB-HOST BRING-UP:**
> If the shipper or doctor mentions mTLS, certs, CA bundles, or
> "secure the connection," **do not write any code, mint any keys,
> generate any CSRs, or call openssl.** Cert delivery is
> automated — jump to **"Securing the connection (mTLS) — DO NOT
> mint your own certs"** below for the one command you should run.
---
> **PULL `origin/main` BEFORE DEBUGGING.** Three install bugs and the
> shipper crash-loop on missing CA were closed in main as of
> 2026-04-30. If your clone is older than that, the symptoms below
> may already be fixed.
## ⚡ FIRST: are you an on-device agent on a lab host?
**Read this table before doing anything else.** Find your symptom in
the left column and run the EXACT command in the right column.
Do not invent variations. Do not paraphrase.
| Symptom (what you see) | What to run (verbatim) |
|---|---|
| `journalctl -u cis490-shipper` shows `412 commit-rejected` or `code commit rejected` | `cd /opt/cis490 && sudo -u cis490 git pull origin main && sudo /opt/cis490/scripts/install-lab-host.sh` |
| `journalctl -u cis490-shipper` shows `400` and `missing X-Cis490-Code-Commit` | Same command as above. |
| `systemctl status cis490-shipper` shows `inactive (dead)` after install | `sudo systemctl enable --now cis490-shipper cis490-orchestrator` |
| Shipper logs `waiting on mTLS material` | **Do nothing** — this is expected. The cert auto-arrives; the daemon retries each request. |
| Receiver index isn't growing (Pi side: `tail -f /var/lib/cis490/index.jsonl`) | `/opt/cis490/.venv/bin/python /opt/cis490/tools/cis490_doctor.py --role lab-host` — read the **red** rows; each one prints the exact fix command. |
| `bootstrap.wg fetch failed` after install | `echo '10.100.0.1 bootstrap.wg collector.wg' \| sudo tee -a /etc/hosts && sudo /opt/cis490/scripts/install-lab-host.sh` |
| `cis490-doctor` says VERSION stamp missing | `sudo /opt/cis490/scripts/install-lab-host.sh` |
| You're not sure what's wrong | `/opt/cis490/.venv/bin/python /opt/cis490/tools/cis490_doctor.py --role lab-host` |
| The canonical pull-and-reinstall command above doesn't fix it (auto-update timer keeps failing, `git pull` errors, you have a local commit, etc.) | **Read [FIXYOURSELF.md](./FIXYOURSELF.md).** It's the explicit decision tree for stuck states the auto-update timer can't recover from. |
**HARD RULES — do not break these even if they "feel like they'd help":**
- **Do not run `openssl`, `step-cli`, mint keys, or write CSRs.** Cert
delivery is fully automated by `install-lab-host.sh`. If you're
about to touch a private key on a lab host, stop.
- **Do not set `verify_tls = false`.** Skipping TLS verification is not
"skipping it for now"; it's leaving a hole.
- **Do not invent a `code_version` field in old episode `meta.json`
files.** That field exists to keep buggy data out of the index.
The drain helper handles them properly.
- **Do not disable, mask, or `systemctl stop` `cis490-shipper` to silence
log noise.** Fix the cause via the table above; the noise stops
on its own once the host has the new code.
- **Do not `git pull` and skip running `install-lab-host.sh`.** A bare
`git pull` updates files on disk but does NOT restart the systemd
units, so the running daemons keep executing the old code.
- **Do not file a Forgejo issue without first running `cis490-doctor`
and pasting its output.**
If your symptom isn't in the table or the canonical command above
doesn't fix it, file a Forgejo issue (see the "File an issue" section
near the bottom).
---
@ -35,15 +66,20 @@ sudo /home/max/.env/wg-pki/scripts/init-cis490-client-ca.sh # idempotent
sudo /home/max/.env/wg-pki/scripts/deploy-cis490-cert.sh \
<host_id> <wg_ip> # mints + scp's + extracts + chmods
# 1. (On the lab host.) Install the lab-host role. This copies the
# repo into /opt/cis490, builds the venv, drops systemd units,
# fetches the Alpine baseline qcow2, and builds the cidata ISO
# with the in-guest agent embedded.
# 1. (On the lab host.) Install the lab-host role. ONE COMMAND DOES
# EVERYTHING — repo to /opt/cis490, venv build, systemd units,
# Alpine baseline qcow2, cidata ISO, VERSION stamp, mTLS cert
# auto-fetch from bootstrap.wg, Tier-3+4 deploy (msfrpcd +
# Metasploitable2 + theZoo malware samples + bridge), pre-stamp
# queue drain, and a `daemon-reload + systemctl restart` of the
# shipper + orchestrator on re-runs. Idempotent — safe to re-run.
sudo /opt/cis490/scripts/install-lab-host.sh
# (or, if running from the manual clone:)
# (or, if running from a clone elsewhere:)
# sudo ./scripts/install-lab-host.sh
# 2. Edit /etc/cis490/lab-host.toml — set host_id and any overrides.
# 2. Edit /etc/cis490/lab-host.toml — set host_id (the only required
# edit). Then re-run step 1 so the cert auto-fetch can resolve
# bootstrap.wg/v1/cert/<host_id>.
# 3. Verify everything before enabling the timer-driven services:
/opt/cis490/.venv/bin/python /opt/cis490/tools/cis490_doctor.py \
@ -58,14 +94,25 @@ sudo systemctl enable --now cis490-shipper cis490-orchestrator
# 5. (On the Pi.) Watch the index grow:
sudo tail -f /var/lib/cis490/index.jsonl
# 6. (Optional, Tier 3.) Enable real exploit fire — needs metasploit.
sudo /opt/cis490/scripts/install-msfrpcd.sh
# Operator-supplied URL + sha256 (Rapid7 download is registration-walled):
IMAGE_URL='…' IMAGE_SHA256='…' sudo OUT_DIR=/var/lib/cis490/vm/images \
/opt/cis490/scripts/fetch-metasploitable2.sh
```
**There is no manual Tier-3 step.** Steps 1 + 2 deploy msfrpcd,
Metasploitable2 (auto-fetched from a public mirror with TOFU sha256
pinning — no Rapid7 registration), and Tier-4 real-malware samples
from theZoo (no API key, no signup). The orchestrator switches to
Tier-3 episodes automatically once the prereqs are on disk.
**Hosts self-update.** `install-lab-host.sh` enables
`cis490-autoupdate.timer`, which runs every 30 min (with up to 10 min
of randomized delay) and does `git fetch + git pull --ff-only +
install-lab-host.sh` whenever origin/main has moved. So once a host
has done the canonical bring-up ONCE, it self-heals on every
subsequent maintainer push — you don't need to remember to pull. The
timer logs to `journalctl -u cis490-autoupdate.service`. If the
host's checkout has diverged from origin (operator hand-edits,
half-applied changes), auto-update bails rather than guessing — that
shows up as a unit failure with a clear log message.
If `index.jsonl` doesn't grow within a wave-interval (~60 s after
`systemctl enable --now`), run `cis490-doctor` again. The most
common silent failures it catches:
@ -78,6 +125,44 @@ common silent failures it catches:
`cis490-doctor --json` is machine-readable for use by other agents.
## Shipper says "400 missing" or "412 commit-rejected": pull and reinstall
If `journalctl -u cis490-shipper` shows a steady stream of
`-> fatal (400)` or `-> 412 commit-rejected` lines, the receiver is
rejecting episodes because their `meta.json::code_version.commit`
isn't in the receiver's allow-list (or isn't being sent at all). This
happens when this lab host is running code older than the receiver
will accept.
The fix is always the same — pull main and re-run the installer:
```sh
cd /opt/cis490
sudo -u cis490 git pull origin main
sudo /opt/cis490/scripts/install-lab-host.sh
```
`install-lab-host.sh` does the rest:
1. Re-stamps `/opt/cis490/VERSION` to the new HEAD.
2. Drains pre-stamp episodes via
`tools/quarantine_unstamped.py` so the queue stops looping on
them. Drained episodes go to `/var/lib/cis490/data/quarantine/`
with a `quarantine_reason.json` per-episode for triage.
3. Restarts `cis490-shipper` and `cis490-orchestrator` so the new code
takes effect.
Do **not** disable the shipper to silence the log noise — once a host
has the new code, traffic resumes immediately. Do **not** mint a fake
`code_version` field in old episodes to bypass the gate; that field
exists specifically to keep buggy pre-fix data out of the training
index.
If the receiver is rejecting *new* episodes too (you've pulled and
restarted, but still see 412), the receiver's allow-list window may
not yet include your commit — wait 5s for its Forgejo refresh, or
push your commit to `origin/main` first if you're testing
unmerged work.
## Tier 3 + Tier 4 deploy (zero-touch via install-lab-host.sh)
`install-lab-host.sh` runs Tier-3 deploy automatically on its second
@ -100,8 +185,7 @@ except for the verify step. Inputs are all optional env vars:
|---|---|
| `SKIP_VERIFY` | skip the live `vsftpd_234_backdoor` smoke run |
| `SKIP_BRIDGE` | skip `br-malware` setup (limits to 2 of 5 modules) |
| `SKIP_TIER4` | skip the Tier-4 auto-fetch even if API key present |
| `MALWAREBAZAAR_API_KEY` | opt-in: present means Tier-4 auto-fetch runs |
| `SKIP_TIER4` | skip the Tier-4 auto-fetch (DEPRECATED — leaves you with mimic-only data, defeats the project) |
The fleet runner auto-detects Tier-3 readiness via
`orchestrator/fleet.py::_msfrpcd_available()`. Once
@ -140,22 +224,9 @@ git clone failure, or a family has no matching directory),
The orchestrator's next selection that picks a sample with
`kind == "real"` runs the real binary via the chunked-upload path
(`exploits.driver._resolve_workload`).
Set `MALWAREBAZAAR_API_KEY` (free signup at https://bazaar.abuse.ch/)
before running `install-tier-3-4.sh` and step 5 runs
`tools/auto_fetch_samples.py` automatically:
1. For each `[[sample]]` in `samples/manifest.toml` without a
`sha256`, query MalwareBazaar by `family` (signature match)
2. Download the first matching binary (sha256-verified on the way in)
3. Edit the manifest in place — add `source`, `sha256`, `url`
4. Episodes that select that sample now run the real binary via the
chunked-upload path (`exploits.driver._resolve_workload`)
The mimic profile remains the fallback for episodes that select a
sample whose binary isn't on disk. Trainers filter on
`meta.sample.kind ∈ {"real", "mimic"}`.
(`exploits.driver._resolve_workload`). The mimic profile remains the
fallback for episodes that select a sample whose binary isn't on
disk. Trainers filter on `meta.sample.kind ∈ {"real", "mimic"}`.
### Confirm Tier 3+4 are flowing
@ -191,10 +262,8 @@ hasn't kicked in yet on any lab host — re-run
`install-msfrpcd.sh`. The script wires the systemd unit + creds;
a manual install bypasses the orchestrator's
`_msfrpcd_available()` probe.
- DO NOT push your MalwareBazaar API key. `samples/.bazaar.token`
is gitignored.
- DO NOT add bogus sha256 entries to `manifest.toml`
`fetch_sample.py` hash-verifies every download.
`auto_fetch_samples.py` hash-verifies every binary it stages.
- DO NOT add real-binary entries by hand when `auto_fetch_samples.py`
exists. Hand-edits are racy with the auto-fetcher's tempfile
rewrite.
@ -308,19 +377,36 @@ transport logs `mTLS material now on disk; shipper transport ready`.
the config, or hand-rolling certs — just confirm the Pi-side step
ran and wait one scan interval.
### `install-lab-host.sh` failures
### Outdated clone? Pull main first.
Three install bugs were fixed in commit `95ac56a`. If you're on an
older clone:
A long list of install-time bugs (cp self-copy, missing service
restart, fatal-loop quarantine, ca_bundle pointing at the wrong
chain, busybox pgrep flags, pycdlib in the wrong dep group, missing
vm/images/ symlink target, doctor sys.path) have been fixed and are
all resolved in main. **If you hit any "this used to work" symptom
on a host that hasn't pulled in a while, the canonical command is
always the same:**
| Symptom | Cause | Fix |
|---|---|---|
| `ModuleNotFoundError: pycdlib` during cidata build | `pycdlib` was in `dev` deps, service venv only installs main deps | Pull main; `pycdlib` is in `dependencies` now |
| Episodes exit `rc=1` in 15 s; `launch_demo.sh` can't find image | `vm/images/` dir wasn't created before symlinking | Pull main; install script now `install -d`'s the directory |
| `cis490-doctor` reports "tier3: No module named exploits" | `sys.path` didn't include repo root | Pull main; doctor inserts `repo_root` into `sys.path` |
```sh
cd /opt/cis490 && sudo -u cis490 git pull origin main && \
sudo /opt/cis490/scripts/install-lab-host.sh
```
**If you hit any of these on a fresh install, pull main first** before
filing an issue — the issue is probably already closed.
That one command:
- Re-stamps `/opt/cis490/VERSION` so episodes get a valid
`code_version.commit` — required by the receiver's gate.
- Drains pre-stamp episodes from `data/episodes/` to
`data/quarantine/` via `tools/quarantine_unstamped.py` so the queue
stops looping on them.
- Runs `daemon-reload` and `systemctl restart cis490-shipper
cis490-orchestrator` so the live daemons pick up the new code
(a bare `git pull` does NOT do this — Python module objects in the
running process are frozen at last service start).
- Re-runs the Tier-3+4 deploy idempotently if the cert is on disk.
After it returns, the shipper will be running as `Type=notify` with
`WatchdogSec=180` — systemd kills + restarts it if a scan pass hangs.
### The classifier is multi-source — don't gut episodes on /proc alone

249
FIXYOURSELF.md Normal file
View file

@ -0,0 +1,249 @@
# FIXYOURSELF.md — when the lab host can't unstick itself automatically
**You are reading this because you (an on-device AI agent) are running
on a CIS490 lab host and one of these is true:**
- `cis490-autoupdate.timer` keeps failing.
- The shipper logs show steady `412 commit-rejected` or `400 missing`.
- `cis490-doctor --role lab-host` flagged you red.
The normal "pull and reinstall" path (AGENTS.md row 1) sometimes can't
recover on its own — most often because someone made a local commit
on this host, or `/opt/cis490` got into a state `git pull --ff-only`
refuses to fix. This doc is the explicit decision tree for those
cases. Run the commands literally. Do not paraphrase.
---
## Step 1 — Identify your state
Run all of these. Save the output. Match the result against the
table.
```sh
cd /opt/cis490
sudo -u cis490 git rev-parse HEAD # → LOCAL
sudo -u cis490 git rev-parse origin/main 2>&1 # → REMOTE (or error)
sudo -u cis490 git fetch origin main 2>&1 # → fresh fetch result
sudo -u cis490 git log -1 --format='%H %ci %s' # → what HEAD is
sudo -u cis490 git status --porcelain # → uncommitted changes?
```
Then:
| If you see… | Your state | Go to |
|---|---|---|
| LOCAL == REMOTE and `git status` empty | Not stuck — run §Z to be safe | §Z |
| LOCAL != REMOTE and `git merge-base --is-ancestor HEAD origin/main` returns 0 | Behind main, no local commits | §A |
| LOCAL != REMOTE and the merge-base check returns 1 (NON-zero) | **You have a local commit not on origin/main** | §B |
| `git fetch` prints a network error | Connectivity broken | §C |
| `/opt/cis490/.git` is missing | No git checkout — populated via `cp -aT` originally | §D |
| `git status` shows tracked files modified | Uncommitted edits on this host | §E |
If multiple match: §C blocks everything else (fix network first), then
§D, then §E, then §B, then §A.
---
## §A — Behind main, clean tree
```sh
cd /opt/cis490
sudo -u cis490 git pull --ff-only origin main
sudo /opt/cis490/scripts/install-lab-host.sh
```
`install-lab-host.sh` re-stamps VERSION, drains the pre-stamp queue,
and restarts the daemons. Verify with §Z.
---
## §B — You have a local commit not on origin/main
This is the elliott-thinkpad case (2026-05-01..02). You committed
something locally, the maintainer's `origin/main` doesn't have it,
and the receiver's allow-list rejects every episode you ship. Pick
ONE of B.1, B.2, B.3 — read all three first.
### B.1 — Push your commit to origin/main (RECOMMENDED if your change is real)
```sh
cd /opt/cis490
sudo -u cis490 git log -3 --stat HEAD # what you're about to push — read it
sudo -u cis490 git push origin HEAD:main
```
If `git push` succeeds: the receiver allow-list picks it up within 5
seconds, new episodes start landing immediately. Verify with §Z.
If `git push` fails with "permission denied" or "auth required": you
don't have push credentials. Skip to B.2 or B.3.
If `git push` fails with "non-fast-forward": your local commit
diverged AROUND origin/main (you committed against an older base).
Try:
```sh
sudo -u cis490 git pull --rebase origin main
# If the rebase has conflicts you can't resolve in 60 seconds, abort:
# sudo -u cis490 git rebase --abort
# Then go to B.2 or B.3.
sudo -u cis490 git push origin HEAD:main
```
### B.2 — Discard your local commit, reset to origin
**This deletes your local commit.** Only do this if you know it
wasn't a real fix or the maintainer told you to discard it.
```sh
cd /opt/cis490
sudo -u cis490 git fetch origin main
sudo -u cis490 git log --oneline HEAD ^origin/main # what you're about to lose
sudo -u cis490 git reset --hard origin/main
sudo /opt/cis490/scripts/install-lab-host.sh
```
### B.3 — Stop, file an issue, wait
If you can't decide between B.1 and B.2 (e.g. you don't know if your
commit is a real fix), do this:
```sh
cd /opt/cis490
HOST_ID=$(grep '^host_id' /etc/cis490/lab-host.toml | cut -d'"' -f2)
LOCAL_SHA=$(sudo -u cis490 git rev-parse HEAD)
DETAIL=$(sudo -u cis490 git log -3 --stat HEAD | head -100)
# File the issue (replace <TOKEN> with the operator's Forgejo token —
# do NOT embed yours in commits)
curl -sS -X POST \
-H "Authorization: token <TOKEN>" \
-H "Content-Type: application/json" \
http://10.100.0.1:3000/api/v1/repos/spectral/CIS490/issues \
-d "$(python3 -c "import json,os; print(json.dumps({
'title': f\"$HOST_ID: stuck on local commit ${LOCAL_SHA:0:12}\",
'body': f\"### What's at HEAD\n\n\`\`\`\n${DETAIL}\n\`\`\`\n\nNeed maintainer to choose: push HEAD to main, or reset --hard origin/main here?\"
}))")"
```
Then leave the daemons running. The shipper will keep auto-quarantining
the 412s — backlog grows but doesn't crash anything. Wait for a
maintainer comment.
---
## §C — Network broken
```sh
ping -c 1 10.100.0.1 # the Pi
sudo wg show # is wg0 up?
sudo systemctl restart wg-quick@wg0 # bring it back up
sudo systemctl restart cis490-shipper cis490-orchestrator
```
If `ping 10.100.0.1` still fails after a `wg-quick` restart, this is
a WireGuard / wg-enroll / iptmonads problem outside this repo. File
an issue at `spectral/wg-enroll` or `spectral/iptmonads` and stop.
---
## §D — `/opt/cis490/.git` missing
The host was originally set up with `cp -aT` (no `.git/`). That makes
auto-update impossible. Re-clone:
```sh
# Stop services so we don't race with the orchestrator mid-episode
sudo systemctl stop cis490-shipper cis490-orchestrator
# Preserve config/data — only /opt/cis490 (the code) gets replaced.
# /etc/cis490/ and /var/lib/cis490/ are NOT touched.
sudo mv /opt/cis490 /opt/cis490.pre-fix
sudo git clone http://maxgit.wg:3000/spectral/CIS490.git /opt/cis490
sudo chown -R cis490:cis490 /opt/cis490
sudo /opt/cis490/scripts/install-lab-host.sh
# Once verified, you can drop the backup:
# sudo rm -rf /opt/cis490.pre-fix
```
---
## §E — Uncommitted edits on tracked files
```sh
cd /opt/cis490
sudo -u cis490 git status --short # see what's modified
sudo -u cis490 git diff # see exactly what changed
```
If the changes are intentional (e.g. you fixed a bug), commit them
first and then go to §B:
```sh
sudo -u cis490 git add <files>
sudo -u cis490 git commit -m "<short description>"
# Now go to §B.
```
If the changes are accidental / left over from debugging, discard
them:
```sh
sudo -u cis490 git checkout -- .
# Now go to §A.
```
---
## §Z — Verify you're unstuck
```sh
# 1. Daemons up?
systemctl is-active cis490-shipper cis490-orchestrator
# Both should say "active".
# 2. VERSION present and matches HEAD?
cat /opt/cis490/VERSION
sudo -u cis490 git -C /opt/cis490 rev-parse HEAD
# The "commit" in VERSION should equal the rev-parse output.
# 3. Recent ship results — should NOT be 412/400-fatal-only:
sudo journalctl -u cis490-shipper --since '2 minutes ago' | tail -20
# 4. Maintainer-side confirmation (if you can reach the Pi):
# sudo tail -5 /var/lib/cis490/index.jsonl
# Should show recent received_at_wall timestamps with your host_id.
# 5. Run the doctor — every red row prints its own fix:
/opt/cis490/.venv/bin/python /opt/cis490/tools/cis490_doctor.py --role lab-host
```
---
## Hard rules (don't break these)
These are repeated from AGENTS.md because the situations covered in
this doc are exactly when an agent is most tempted to break them.
- **Do NOT** edit `meta.json` files to add a fake `code_version`. The
gate exists to keep buggy data out of the index.
- **Do NOT** disable / mask / `systemctl stop` `cis490-shipper` to
silence log noise. Fix the cause.
- **Do NOT** set `verify_tls = false` in `lab-host.toml`. That's a
bypass, not a fix.
- **Do NOT** `git commit` on `/opt/cis490` to "make the gate
happy" — committing without pushing is exactly how the
elliott-thinkpad incident started. If you commit, push (§B.1) or
don't commit at all.
- **Do NOT** delete `/var/lib/cis490/data/quarantine/`. Those are
forensic records of what the gate rejected; they're capped at 30
days by the cleanup pass.
- **Do NOT** clobber `/etc/cis490/certs/`. Cert delivery is
automated; rerun `install-lab-host.sh` if certs are missing.
If you find yourself wanting to do any of the above, stop and file
an issue (§B.3 has the curl command). The maintainer would much
rather resolve a stale lab host by reading an issue than by
reverse-engineering what an agent did to escape a stuck state.

View file

@ -0,0 +1,21 @@
[Unit]
Description=CIS490 lab-host auto-update from origin/main
Documentation=https://maxgit.wg/spectral/CIS490
After=network-online.target wg-quick@wg0.service
# We don't Want network-online so that a host that's offline just
# silently skips the update tick instead of pinning a unit failure.
[Service]
Type=oneshot
# Runs as root because install-lab-host.sh writes to /etc/, /opt/, and
# calls systemctl. The script drops to the cis490 user via `sudo -u`
# for the git fetch + pull.
ExecStart=/opt/cis490/scripts/auto-update.sh
StandardOutput=journal
StandardError=journal
[Install]
# The TIMER is what gets enabled, not the service itself. We still set
# WantedBy here so that an operator can `systemctl start
# cis490-autoupdate.service` manually for a one-shot pull.
WantedBy=multi-user.target

View file

@ -0,0 +1,18 @@
[Unit]
Description=Run CIS490 lab-host auto-update every 30 minutes
Documentation=https://maxgit.wg/spectral/CIS490
[Timer]
# 5 min after boot so a freshly-flashed host catches up promptly.
OnBootSec=5min
# Then every 30 min. RandomizedDelaySec spreads across hosts so
# the receiver / forgejo aren't hammered all at once when several
# lab hosts come up together.
OnUnitActiveSec=30min
RandomizedDelaySec=10min
# If the host was off when a tick was due, run on next boot.
Persistent=true
Unit=cis490-autoupdate.service
[Install]
WantedBy=timers.target

View file

@ -7,7 +7,14 @@ Wants=network-online.target
Requires=wg-quick@wg0.service
[Service]
Type=simple
# Type=notify so systemd waits for sd_notify("READY=1") before
# considering the unit started, and so WatchdogSec= can kick in.
# Without this, Restart=on-failure only catches process crashes —
# silent stalls (deadlock, blocked I/O past timeout, hung tar
# subprocess) leave a zombie running with the data backlog growing.
Type=notify
NotifyAccess=main
WatchdogSec=180
User=cis490
Group=cis490
WorkingDirectory=/opt/cis490

View file

@ -33,5 +33,13 @@ branch = "main"
# Optional Forgejo token for private repos; remove for public.
# auth_token = "..."
#
# Dev-only fallback (used iff forgejo_url is unset):
# local_repo_path = "/home/max/cis490"
# Optional local-git fallback. When BOTH forgejo_url and
# local_repo_path are set, the gate first asks Forgejo; if that fails
# (e.g. simultaneous restart of receiver + Forgejo on the same Pi) it
# falls back to `git log` against this checkout instead of locking
# out every shipper. When forgejo_url is unset, this is the only
# backend.
#
# Auto-detected: if you don't set this, the receiver checks for
# /opt/cis490/.git at startup and uses that path when present.
# local_repo_path = "/opt/cis490"

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import logging
import os
from pathlib import Path
import uvicorn
@ -34,8 +35,16 @@ def main() -> None:
)
version_gate = None
if cfg.version_gate_enabled:
# Auto-detect /opt/cis490/.git as a fallback so a forgejo blip
# at startup doesn't reject every PUT with not-in-window. The
# receiver service has read access to /opt under
# ProtectSystem=strict, and that path is where the production
# install lands — so it's the natural local source of truth.
repo_path = cfg.version_gate_local_repo
if repo_path is None and Path("/opt/cis490/.git").is_dir():
repo_path = Path("/opt/cis490")
version_gate = VersionGate(
repo_path=cfg.version_gate_local_repo,
repo_path=repo_path,
window=cfg.version_gate_window,
forgejo_url=cfg.version_gate_forgejo_url,
repo_owner=cfg.version_gate_repo_owner,

View file

@ -19,7 +19,16 @@ log = logging.getLogger("cis490.receiver")
SUFFIX = ".tar.zst"
SCHEMA_VERSION = 1
SCHEMA_VERSION = 2
# Mirrored from orchestrator.benign so the receiver can validate the
# benign-profile header without taking a dependency on the orchestrator
# package. Keep in sync if BENIGN_PROFILES grows.
_VALID_BENIGN_PROFILES: frozenset[str] = frozenset({
"idle", "web_visitor", "admin_session", "cron_burst",
"file_browse", "db_query", "package_check",
})
_VALID_EPISODE_TYPES: frozenset[str] = frozenset({"control", "infected"})
def _bearer_check(request: Request, expected: str | None) -> Response | None:
@ -87,7 +96,7 @@ def make_app(
expected_sha = expected_sha.lower()
try:
schema_version = int(request.headers.get("x-schema-version", "1"))
schema_version = int(request.headers.get("x-schema-version", "2"))
except ValueError:
return JSONResponse({"error": "bad X-Schema-Version"}, status_code=400)
@ -106,10 +115,14 @@ def make_app(
"error": "missing X-Cis490-Code-Commit header",
"remediation": (
"Lab-host is shipping with no code_version stamp. "
"Pull origin/main and re-run install-lab-host.sh "
"so the orchestrator emits meta.json.code_version "
"and the shipper forwards X-Cis490-Code-Commit."
"On the lab host:\n"
" cd /opt/cis490 && sudo -u cis490 git pull origin main && "
"sudo /opt/cis490/scripts/install-lab-host.sh\n"
"If that errors out, read FIXYOURSELF.md at the repo "
"root — it's a six-branch decision tree for stuck "
"states the auto-update timer can't recover from."
),
"see_also": "FIXYOURSELF.md",
}
return JSONResponse(body, status_code=400)
if reason == "bad-format":
@ -117,22 +130,29 @@ def make_app(
{"error": "X-Cis490-Code-Commit must be 40 lowercase hex"},
status_code=400,
)
# not-in-window: out-of-date lab host
# not-in-window: out-of-date lab host OR diverged-HEAD
body = {
"error": "code commit rejected: not in receiver's allow-list",
"your_commit": commit,
"valid_window_size": version_gate.valid_count(),
"head_commit": head,
"remediation": (
"Pull origin/main on this lab host and rebuild before "
"shipping further:\n"
" cd /opt/cis490 && sudo -u cis490 git pull origin main\n"
" sudo /opt/cis490/scripts/install-lab-host.sh\n"
" sudo systemctl restart cis490-orchestrator\n"
"Episodes from old code stay queued; the next ship will "
"succeed once the lab-host's HEAD is in the receiver's "
"allow-list. Do NOT bypass this check — it exists to "
"keep buggy pre-fix data out of the training set."
"Your commit isn't on origin/main. Two cases:\n"
"\n"
"(1) You're just behind. Run on the lab host:\n"
" cd /opt/cis490 && sudo -u cis490 git pull --ff-only "
"origin main && sudo /opt/cis490/scripts/install-lab-host.sh\n"
"\n"
"(2) You have a LOCAL commit that's not on origin/main "
"(git pull --ff-only fails). This is the diverged-HEAD "
"case — the auto-update timer will refuse to fix it. "
"Read FIXYOURSELF.md §B at the repo root: three options "
"(push your commit, reset --hard origin/main, or file an "
"issue and wait). Pick one.\n"
"\n"
"Do NOT bypass this check by faking code_version in "
"meta.json — the gate exists to keep buggy data out of "
"the training set."
),
}
log.warning(
@ -141,6 +161,21 @@ def make_app(
)
return JSONResponse(body, status_code=412)
# Optional matrix-stratification headers. Validated against the
# closed enums so a misbehaving shipper can't write garbage into
# the index. Unknown values are dropped (header treated as absent)
# and logged so the operator can spot a version drift quickly.
episode_type = (request.headers.get("x-episode-type") or "").strip().lower()
if episode_type and episode_type not in _VALID_EPISODE_TYPES:
log.warning("dropping unknown X-Episode-Type=%r host=%s id=%s",
episode_type, host_id, episode_id)
episode_type = ""
benign_profile = (request.headers.get("x-benign-profile") or "").strip().lower()
if benign_profile and benign_profile not in _VALID_BENIGN_PROFILES:
log.warning("dropping unknown X-Benign-Profile=%r host=%s id=%s",
benign_profile, host_id, episode_id)
benign_profile = ""
cl = request.headers.get("content-length")
if cl is not None:
try:
@ -157,6 +192,8 @@ def make_app(
expected_sha256=expected_sha,
schema_version=schema_version,
commit=commit or None,
episode_type=episode_type or None,
benign_profile=benign_profile or None,
body=request.stream(),
max_bytes=max_episode_bytes,
)

View file

@ -84,8 +84,22 @@ class VersionGate:
# ---- backend dispatch -----------------------------------------------
def _refresh(self) -> None:
# Try forgejo first when configured. If it fails AND we also
# have a local repo path, fall back to git — this lets the
# receiver keep working through a forgejo blip (e.g. both
# services restarting on the same host) without flat-rejecting
# every PUT with not-in-window. The local repo on the
# receiver's own host is the closest thing to ground truth we
# have when the canonical source is unreachable.
source = "forgejo" if self.forgejo_url else "git"
if self.forgejo_url:
hashes, head = self._refresh_forgejo()
if not hashes and self.repo_path is not None:
log.info("forgejo refresh empty; falling back to local git "
"at %s", self.repo_path)
hashes, head = self._refresh_git()
if hashes:
source = "git-fallback"
else:
hashes, head = self._refresh_git()
if not hashes:
@ -98,8 +112,7 @@ class VersionGate:
self._cached_at = time.monotonic()
self._head = head
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
len(hashes), head[:12] if head else "?",
"forgejo" if self.forgejo_url else "git")
len(hashes), head[:12] if head else "?", source)
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""

76
scripts/auto-update.sh Executable file
View file

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Lab-host auto-update. Pulls origin/main and re-runs install-lab-host.sh
# when there's a newer commit on the canonical remote.
#
# Run by cis490-autoupdate.timer. Idempotent; safe to re-invoke.
#
# Why this exists: when the receiver's commit-allow-list rolls forward,
# any lab host running older code starts getting 412/400 on every PUT.
# Without auto-update, that requires either the on-device AI agent or
# the operator to notice and run `git pull && install-lab-host.sh` —
# neither of which happens reliably (k-gamingcom + elliott-thinkpad
# both stalled silently on the post-cutover 2026-05-01 incident).
# With auto-update, hosts catch up within RandomizedDelaySec of the
# next timer fire (≤ 40 min) on their own.
#
# Safety:
# - git pull is `--ff-only` — never rewrites or merges; if local
# diverged from origin (operator hand-edit, partial install) it
# bails rather than guess.
# - install-lab-host.sh is the SAME script the operator runs by hand.
# No special "auto" path; we want exactly one path through bring-up.
# - On any failure we exit non-zero so systemd records it; the timer
# re-fires next interval. Failures don't disable the timer.
# - The version gate provides quality control: even if auto-update
# pulls a known-bad commit, the receiver's allow-list catches it
# downstream.
set -euo pipefail
INSTALL_ROOT="${INSTALL_ROOT:-/opt/cis490}"
SERVICE_USER="${SERVICE_USER:-cis490}"
log() { printf '[auto-update] %s\n' "$*" >&2; }
[[ -d "$INSTALL_ROOT/.git" ]] || {
log "no .git in $INSTALL_ROOT — auto-update only supports git checkouts"
exit 0
}
cd "$INSTALL_ROOT"
# All git ops run as the service user (the owner of $INSTALL_ROOT).
# Running as root would trip git's "dubious ownership" guard.
GIT() { sudo -u "$SERVICE_USER" git -C "$INSTALL_ROOT" "$@"; }
if ! GIT fetch --quiet origin main; then
log "git fetch failed — network blip or remote down; will retry next tick"
exit 0 # don't fail the unit; this is expected on offline hosts
fi
LOCAL="$(GIT rev-parse HEAD)"
REMOTE="$(GIT rev-parse origin/main)"
if [[ "$LOCAL" == "$REMOTE" ]]; then
log "up to date at ${LOCAL:0:12}"
exit 0
fi
# Branch divergence check — operator hand-edits or partial installs
# could leave HEAD on a non-main commit. We don't want to silently
# overwrite that.
if ! GIT merge-base --is-ancestor HEAD origin/main; then
log "WARN: local HEAD ${LOCAL:0:12} is not an ancestor of origin/main"
log " ${REMOTE:0:12} — refusing to fast-forward. Investigate via"
log " 'git -C $INSTALL_ROOT log --all --oneline -10' on the host."
exit 1
fi
log "updating ${LOCAL:0:12} -> ${REMOTE:0:12}"
GIT pull --ff-only --quiet origin main
# install-lab-host.sh handles VERSION re-stamp, queue drain, daemon-reload,
# and systemctl restart of the lab-host services. Pass control to it
# directly via exec so its exit code is ours.
log "re-running install-lab-host.sh to apply new code"
exec "$INSTALL_ROOT/scripts/install-lab-host.sh"

View file

@ -65,8 +65,18 @@ install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 \
# --- 3. repo + venv ----------------------------------------------------
log "syncing repo into $INSTALL_ROOT"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT"
# We use a clean cp -aT rather than rsync to avoid an extra dep.
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
# The expected lab-host upgrade path is `cd /opt/cis490 && git pull &&
# sudo ./scripts/install-lab-host.sh`, which means REPO_ROOT and
# INSTALL_ROOT can be the same directory. cp -aT errors out in that
# case ("are the same file") and `set -e` aborts before the systemd
# units get installed and services restart, leaving the host running
# whatever code was loaded at last service start.
if [[ "$(readlink -f "$REPO_ROOT")" == "$(readlink -f "$INSTALL_ROOT")" ]]; then
log "REPO_ROOT == INSTALL_ROOT ($INSTALL_ROOT); git pull already updated tree, skipping cp"
else
# Clean cp -aT to avoid an extra rsync dep.
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
fi
chown -R "$SERVICE_USER":"$SERVICE_USER" "$INSTALL_ROOT"
# Stamp a VERSION file at install time so episodes can record the
@ -107,7 +117,19 @@ install -m 0644 "$REPO_ROOT/etc/cis490-shipper.service" \
/etc/systemd/system/cis490-shipper.service
install -m 0644 "$REPO_ROOT/etc/cis490-orchestrator.service" \
/etc/systemd/system/cis490-orchestrator.service
# Auto-update: a 30-min timer that does git fetch + (if behind) pull
# and re-run this script. Prevents host-falls-behind incidents when
# the receiver's allow-list rolls forward and an on-device agent
# fails to act on the 412 remediation. See AGENTS.md "Auto-update".
install -m 0644 "$REPO_ROOT/etc/cis490-autoupdate.service" \
/etc/systemd/system/cis490-autoupdate.service
install -m 0644 "$REPO_ROOT/etc/cis490-autoupdate.timer" \
/etc/systemd/system/cis490-autoupdate.timer
systemctl daemon-reload
# Enable the timer immediately — the operator gets self-healing on the
# next 30-min tick without an extra `systemctl enable`. Idempotent.
systemctl enable --now cis490-autoupdate.timer 2>/dev/null || \
log "WARN: could not enable cis490-autoupdate.timer (will retry next install)"
# --- 5. config template (only on first install) -----------------------
if [[ ! -f "$ETC_ROOT/lab-host.toml" ]]; then
@ -281,6 +303,35 @@ if [[ "$NEW_INSTALL" == "1" ]]; then
log "================================================================="
fi
# --- 9. drain pre-stamp queue + restart services -----------------------
# On a re-run (not first install), the running services are still
# executing whatever code they loaded at last start, so the new module
# objects we just dropped on disk don't take effect until restart. And
# any episodes the orchestrator generated against pre-stamping code
# are missing meta.json::code_version — the receiver returns 400 for
# every PUT, the queue's fatal path didn't quarantine them in older
# code, and the shipper burns cycles re-tarring them on every pass.
# Drain them once, then restart so the new code reaches the live
# daemon.
if [[ "$NEW_INSTALL" != "1" ]]; then
PY="$INSTALL_ROOT/.venv/bin/python"
if [[ -x "$PY" && -f "$INSTALL_ROOT/tools/quarantine_unstamped.py" ]]; then
log "draining pre-stamp episodes from queue (idempotent)"
sudo -u "$SERVICE_USER" -- "$PY" \
"$INSTALL_ROOT/tools/quarantine_unstamped.py" \
--data-root "$DATA_ROOT/data" || \
log "WARN: quarantine drain returned non-zero — see output above"
fi
systemctl daemon-reload
for svc in cis490-shipper cis490-orchestrator; do
if systemctl is-enabled --quiet "$svc" 2>/dev/null; then
log "restarting $svc to pick up new code"
systemctl restart "$svc" || \
log "WARN: $svc restart failed — check 'journalctl -u $svc'"
fi
done
fi
log "lab-host install complete."
log ""
log "Cloning this repo and running the launchers manually is NOT enough."

View file

@ -152,6 +152,15 @@ else
log "Tier-4 ✓ ($REAL_COUNT real binaries staged in $INSTALL_ROOT/samples/store/)"
fi
# Restart the orchestrator now so the next wave actually runs Tier-3
# against the freshly-staged msfrpcd + samples. Skipped only if the
# unit isn't enabled yet (first install hasn't run `systemctl enable`).
if systemctl is-enabled --quiet cis490-orchestrator 2>/dev/null; then
log "restarting cis490-orchestrator to pick up new modules + samples"
systemctl restart cis490-orchestrator || \
log "WARN: orchestrator restart failed — check 'journalctl -u cis490-orchestrator'"
fi
log ""
log "================================================================="
log " Tier-3 deploy complete on $(hostname)"
@ -160,7 +169,4 @@ log " - metasploit-framework + cis490-msfrpcd.service active"
log " - $OUT_DIR/metasploitable2.qcow2 staged"
log " - bridge: $(ip link show br-malware >/dev/null 2>&1 && echo up || echo skipped)"
log " - Tier-4: $(ls "$INSTALL_ROOT/samples/store/" 2>/dev/null | wc -l) real binaries staged"
log ""
log " Restart the orchestrator so the next wave runs Tier-3:"
log " sudo systemctl restart cis490-orchestrator"
log "================================================================="

View file

@ -13,7 +13,9 @@ from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import socket
import sys
from pathlib import Path
@ -22,6 +24,30 @@ from .queue import ShipperQueue
from .transport import ShipperTransport
def _sd_notify(msg: str) -> None:
"""Send ``msg`` to systemd's notify socket. No-op when running
outside systemd (NOTIFY_SOCKET unset) so the same binary works
fine under `--once`, manual invocation, or tests.
See sd_notify(3). The protocol is one-line key=value messages
over an AF_UNIX SOCK_DGRAM socket. We don't need the libsystemd
dep talking to the socket directly is stdlib."""
sock_path = os.environ.get("NOTIFY_SOCKET")
if not sock_path:
return
if sock_path.startswith("@"):
# Abstract socket: prepend NUL and strip the leading '@'.
sock_path = "\0" + sock_path[1:]
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as s:
s.sendto(msg.encode("ascii"), sock_path)
except OSError:
# Failing to notify isn't fatal — at worst systemd's
# WatchdogSec fires and we get restarted, which is the
# behaviour the watchdog exists to provide.
pass
def _setup_logging(level: str) -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
@ -98,7 +124,16 @@ def main(argv: list[str] | None = None) -> int:
"shipper starting: host_id=%s data_root=%s receiver=%s",
cfg.host_id, cfg.data_root, cfg.receiver.url,
)
queue.run_forever(stop_check=lambda: stopping)
# Tell systemd we're ready to take work — gates Type=notify in
# the unit file. The systemd unit's WatchdogSec= will then expect
# WATCHDOG=1 messages at least every <WatchdogSec> seconds; a
# missed one means stalled-mid-loop and triggers a kill+restart.
_sd_notify("READY=1")
queue.run_forever(
stop_check=lambda: stopping,
heartbeat=lambda: _sd_notify("WATCHDOG=1"),
)
_sd_notify("STOPPING=1")
return 0

View file

@ -33,6 +33,15 @@ class ShipperConfig:
backoff_seconds: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 120.0, 300.0)
# Local retention before pruning data/shipped/.
keep_local_for_days: int = 7
# Quarantine retention. Episodes that the receiver permanently
# rejected (400/412) sit here as evidence; without an upper bound
# they grow forever. Set to 0 to disable cleanup (operator
# responsibility).
quarantine_keep_days: int = 30
# How often the quarantine cleanup pass actually runs. Gated
# because a 5-second scan tick checking mtimes against a
# 30-day-old cutoff is wasteful — once an hour is plenty.
quarantine_cleanup_interval_s: float = 3600.0
@property
def episodes_dir(self) -> Path:
@ -46,6 +55,14 @@ class ShipperConfig:
def shipped_dir(self) -> Path:
return self.data_root / "shipped"
@property
def quarantine_dir(self) -> Path:
# Episodes the receiver has refused permanently (4xx other than
# 409 — typically 400 missing-commit or 412 not-in-window). They
# don't belong in shipped/ (we have nothing to compare against)
# and re-shipping them would just re-burn the queue.
return self.data_root / "quarantine"
@classmethod
def load(cls, path: str | Path) -> "ShipperConfig":
with open(path, "rb") as f:
@ -80,6 +97,7 @@ class ShipperConfig:
scan_interval_s=float(data.get("shipper", {}).get("scan_interval_s", 5.0)),
request_timeout_s=float(data.get("shipper", {}).get("request_timeout_s", 60.0)),
keep_local_for_days=int(retention.get("keep_local_for_days", 7)),
quarantine_keep_days=int(retention.get("quarantine_keep_days", 30)),
)

View file

@ -64,11 +64,27 @@ class ShipperQueue:
cfg.episodes_dir.mkdir(parents=True, exist_ok=True)
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
cfg.shipped_dir.mkdir(parents=True, exist_ok=True)
cfg.quarantine_dir.mkdir(parents=True, exist_ok=True)
# Last wall-clock time we walked quarantine/ for cleanup. Set
# to 0.0 so the first pass always sweeps (covers daemon
# startup after a long downtime).
self._last_quarantine_cleanup_at: float = 0.0
# ---- main entry point ---------------------------------------------
def run_once(self) -> PassResult:
"""One scan pass. Returns counts for logging / tests."""
# Sweep stale outbox tarballs first. Normal lifecycle has the
# tarball deleted alongside retire / quarantine, but operator
# intervention (rm of an episode dir) or a crash between
# rename(2) and the post-ship cleanup can leave one orphaned.
# Bounded by the number of files in outbox/, so cheap to do
# every pass.
self._sweep_outbox()
# Drop quarantine entries older than keep_days — gated on a
# once-per-hour check so the 5-second scan tick doesn't
# statx() the whole quarantine tree on every pass.
self._maybe_cleanup_quarantine()
ready = self._ready_episodes()
scanned = len(ready)
shipped = 0
@ -115,6 +131,11 @@ class ShipperQueue:
elif res.status == "transient":
transient += 1
else: # fatal
# Move the episode out of the live queue so the next
# scan doesn't re-tar/re-PUT the same dir forever. The
# tarball gets deleted; meta.json + telemetry survive
# in quarantine/ for operator triage.
self._quarantine(ep_dir, tarball, res)
fatal += 1
return PassResult(
@ -125,12 +146,36 @@ class ShipperQueue:
fatal=fatal,
)
def run_forever(self, *, stop_check=lambda: False) -> None:
def run_forever(
self,
*,
stop_check=lambda: False,
heartbeat=lambda: None,
) -> None:
"""Long-running scan loop.
Args:
stop_check: returns True when the daemon should exit (SIGTERM
handler). Checked between passes and inside the inter-pass
sleep so SIGTERM isn't blocked for up to scan_interval_s.
heartbeat: invoked once per completed pass. Wired to
sd_notify("WATCHDOG=1") in production so systemd can
kill+restart the daemon if a pass hangs longer than the
unit's WatchdogSec — catches silent stalls (deadlock,
blocked I/O past timeout) that Restart=on-failure misses.
"""
while not stop_check():
try:
self.run_once()
except Exception:
log.exception("scan pass crashed; sleeping anyway")
try:
heartbeat()
except Exception:
# A heartbeat failure mustn't take down the daemon —
# if the watchdog wire is broken, we want at least the
# ship loop to keep running.
log.exception("heartbeat callback failed")
# Coarse sleep: we don't need precise scheduling and we
# don't want a tight loop on errors.
t0 = time.monotonic()
@ -150,6 +195,70 @@ class ShipperQueue:
out.append(ep)
return out
def _maybe_cleanup_quarantine(self) -> None:
"""Walk quarantine/ and remove episodes older than keep_days.
Cheap on a daemon that's been running a while because the
once-per-hour gate prevents the scan tick from statx()-ing
the whole tree every 5s. On the first pass after startup, the
gate's 0.0 sentinel means we always sweep — that catches a
daemon that was offline through a backlog accumulation."""
keep_days = self.cfg.quarantine_keep_days
if keep_days <= 0:
return
now = time.time()
if (now - self._last_quarantine_cleanup_at
< self.cfg.quarantine_cleanup_interval_s):
return
self._last_quarantine_cleanup_at = now
cutoff = now - (keep_days * 86400)
removed = 0
if not self.cfg.quarantine_dir.exists():
return
for ep in self.cfg.quarantine_dir.iterdir():
if not ep.is_dir():
continue
try:
# Use mtime — quarantine dirs are written once
# (the rename + the reason file), so mtime tracks
# quarantine age, not the original episode age.
if ep.stat().st_mtime < cutoff:
shutil.rmtree(ep, ignore_errors=True)
removed += 1
except OSError:
log.exception("quarantine cleanup failed for %s", ep.name)
if removed:
log.info("quarantine cleanup: removed %d episode(s) older than %d days",
removed, keep_days)
def _sweep_outbox(self) -> None:
"""Delete tarballs in outbox/ that have no matching episode dir.
The invariant the queue maintains: ``outbox/<id>.tar.zst``
only exists while ``episodes/<id>/`` is also present.
retire+quarantine both delete the tarball when they move the
episode out, and tar overwrites any prior ``.partial`` on
each pass. So a stray tarball means an external actor (or an
OS crash) broke the invariant clean it up rather than
carrying dead bytes on disk forever."""
outbox = self.cfg.outbox_dir
if not outbox.exists():
return
for f in outbox.iterdir():
name = f.name
if name.endswith(".tar.zst"):
ep_id = name[: -len(".tar.zst")]
elif name.endswith(".tar.zst.partial"):
ep_id = name[: -len(".tar.zst.partial")]
else:
continue
if not (self.cfg.episodes_dir / ep_id).exists():
try:
f.unlink()
log.info("swept orphan tarball %s", name)
except OSError:
log.exception("failed to sweep orphan tarball %s", name)
def _read_episode_commit(self, ep_dir: Path) -> str | None:
"""Pull meta.json::code_version.commit so the shipper can send
it as X-Cis490-Code-Commit. Returns None if the file is
@ -223,6 +332,30 @@ class ShipperQueue:
ep_dir.replace(target)
tarball.unlink(missing_ok=True)
def _quarantine(self, ep_dir: Path, tarball: Path, res: ShipResult) -> None:
"""Move a permanently-rejected episode out of the live queue.
Drops a small ``quarantine_reason.json`` next to the episode so
an operator (or a future backfill tool) can see why without
having to dig through journalctl. The tarball in outbox/ goes
away re-shipping won't help and it just burns disk."""
target = self.cfg.quarantine_dir / ep_dir.name
if target.exists():
shutil.rmtree(ep_dir, ignore_errors=True)
else:
ep_dir.replace(target)
reason = {
"status_code": res.status_code,
"error": res.error,
"body": res.body,
"quarantined_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
try:
(target / "quarantine_reason.json").write_text(json.dumps(reason))
except OSError:
log.exception("quarantine reason write failed for %s", ep_dir.name)
tarball.unlink(missing_ok=True)
def _which_zstd() -> bool:
return shutil.which("zstd") is not None

View file

@ -0,0 +1,142 @@
"""Unit tests for cis490_doctor's recent-shipping-errors parser.
The parser scans `journalctl -u cis490-shipper` output for the same
strings the receiver/shipper actually emit. We monkeypatch `_run` so
the test doesn't need a real systemd journal.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"cis490_doctor", REPO_ROOT / "tools" / "cis490_doctor.py"
)
doctor = importlib.util.module_from_spec(spec)
sys.modules["cis490_doctor"] = doctor
spec.loader.exec_module(doctor)
def _run_journal(monkeypatch, output: str, rc: int = 0) -> None:
"""Stub `cis490_doctor._run` to return ``output`` for journalctl
calls, real subprocess for everything else."""
real = doctor._run
def fake(cmd):
if cmd and cmd[0] == "journalctl":
return rc, output, ""
return real(cmd)
monkeypatch.setattr(doctor, "_run", fake)
def _check_for(report: doctor.Report, name_substr: str) -> doctor.Check | None:
for c in report.checks:
if name_substr in c.name:
return c
return None
def test_412_pattern_surfaces_as_fail(monkeypatch: pytest.MonkeyPatch) -> None:
"""5 minutes of 412 commit-rejected logs is the exact symptom we
saw on k-gamingcom for 5+ hours pre-fix. Doctor must point the
operator at the canonical pull+reinstall path."""
log = "\n".join(
f"ship 01EP{i:02d} -> 412 commit-rejected. your_commit=abc..."
for i in range(20)
)
_run_journal(monkeypatch, log)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "fail"
assert "out-of-window" in c.detail
assert "git pull origin main" in c.fix
assert "install-lab-host.sh" in c.fix
def test_400_missing_commit_pattern(monkeypatch: pytest.MonkeyPatch) -> None:
log = "\n".join(
f'episode 01EP{i:02d} -> fatal (400) "missing X-Cis490-Code-Commit header"'
for i in range(5)
)
_run_journal(monkeypatch, log)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "fail"
assert "missing-commit-header" in c.detail
assert "install-lab-host.sh" in c.fix
def test_clean_journal_is_ok(monkeypatch: pytest.MonkeyPatch) -> None:
log = "\n".join(
f"ship 01EP{i:02d} -> stored (201) " for i in range(10)
)
_run_journal(monkeypatch, log)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "ok"
def test_empty_journal_is_ok_idle(monkeypatch: pytest.MonkeyPatch) -> None:
"""No log output ≠ broken — could be a daemon that finished
everything and is idle."""
_run_journal(monkeypatch, "")
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "ok"
assert "idle" in c.detail
def test_journalctl_unavailable_skips(monkeypatch: pytest.MonkeyPatch) -> None:
"""Doctor often runs as the unprivileged user and reading the
system journal can need systemd-journal group membership.
journalctl returning non-zero (perm denied / not installed)
should produce a `skip` row, not noise."""
_run_journal(monkeypatch, "", rc=1)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent log scan")
assert c is not None and c.status == "skip"
def test_transient_failures_warn_at_threshold(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A pile of transient (5xx/network) failures isn't fatal but
it's worth flagging — the receiver might be down or WG might be
flaking. The threshold (>5) is tuned to ignore an occasional
blip."""
log = "\n".join(
f"ship 01EP{i:02d} -> transient: timeout" for i in range(10)
)
_run_journal(monkeypatch, log)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "warn"
assert "transient" in c.detail
def test_412_takes_priority_over_400(monkeypatch: pytest.MonkeyPatch) -> None:
"""If both error classes are present, 412 (out-of-date code)
is the more actionable diagnosis fixing it fixes both, since
a re-install also writes VERSION."""
log = (
"ship 01OLD -> 412 commit-rejected\n"
'ship 01OLD2 -> fatal (400) "missing X-Cis490-Code-Commit header"\n'
)
_run_journal(monkeypatch, log)
report = doctor.Report(role="lab-host")
doctor.check_recent_shipping_errors(report)
c = _check_for(report, "recent ship results")
assert c is not None and c.status == "fail"
assert "out-of-window" in c.detail

View file

@ -0,0 +1,149 @@
"""Tests for tools/quarantine_unstamped.py.
This is the one-shot drain that we run on each lab host once after the
commit-gate goes live. The behaviour we care about:
- episodes WITH a 40-char-hex code_version.commit stay put
- episodes WITHOUT that field move to quarantine/
- episodes lacking done.marker (still being written) are untouched
- quarantine/<id>/quarantine_reason.json gets dropped beside it
- re-running is a no-op (idempotent pre-stamp episodes are gone,
valid ones aren't touched)
"""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"quarantine_unstamped", REPO_ROOT / "tools" / "quarantine_unstamped.py"
)
qu = importlib.util.module_from_spec(spec)
sys.modules["quarantine_unstamped"] = qu
spec.loader.exec_module(qu)
def _ep(root: Path, name: str, *, meta: dict | None, done: bool = True) -> Path:
"""Stage a fake episode under <root>/episodes/<name>/."""
d = root / "episodes" / name
d.mkdir(parents=True)
if meta is not None:
(d / "meta.json").write_text(json.dumps(meta))
if done:
(d / "done.marker").touch()
return d
def test_drain_moves_unstamped_to_quarantine(tmp_path: Path) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"}) # no code_version
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.quarantined == 1
assert res.kept_stamped == 0
assert not (tmp_path / "episodes" / "01OLD").exists()
assert (tmp_path / "quarantine" / "01OLD" / "meta.json").exists()
reason = json.loads(
(tmp_path / "quarantine" / "01OLD" / "quarantine_reason.json").read_text()
)
assert reason["status_code"] == 400
assert "pre-stamp" in reason["error"]
def test_drain_keeps_stamped_episode(tmp_path: Path) -> None:
"""A stamped episode (40-char-hex commit) belongs in the live
queue the shipper will succeed against the receiver gate."""
_ep(tmp_path, "01NEW", meta={
"code_version": {"commit": "a" * 40, "branch": "main", "dirty": False},
})
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.quarantined == 0
assert res.kept_stamped == 1
assert (tmp_path / "episodes" / "01NEW").exists()
assert not (tmp_path / "quarantine" / "01NEW").exists()
def test_drain_rejects_short_commit(tmp_path: Path) -> None:
"""A truncated/garbled commit is not accepted as 'stamped'
receiver would 400 it as bad-format anyway."""
_ep(tmp_path, "01SHORT", meta={"code_version": {"commit": "abc123"}})
res = qu.drain(tmp_path)
assert res.quarantined == 1
def test_drain_rejects_non_hex_commit(tmp_path: Path) -> None:
bad = "z" * 40
_ep(tmp_path, "01BADHEX", meta={"code_version": {"commit": bad}})
res = qu.drain(tmp_path)
assert res.quarantined == 1
def test_drain_skips_in_progress_episode(tmp_path: Path) -> None:
"""No done.marker means the orchestrator is still writing to the
dir leave it alone, drainer is for 'finished and queued' only."""
_ep(tmp_path, "01PARTIAL", meta=None, done=False)
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.skipped_no_marker == 1
assert res.quarantined == 0
assert (tmp_path / "episodes" / "01PARTIAL").exists()
def test_drain_handles_missing_meta_json(tmp_path: Path) -> None:
"""A done episode with no meta.json is corrupt — should be
quarantined, not kept (it'd fail the gate too)."""
_ep(tmp_path, "01NOMETA", meta=None, done=True)
res = qu.drain(tmp_path)
assert res.quarantined == 1
assert (tmp_path / "quarantine" / "01NOMETA" / "quarantine_reason.json").exists()
def test_drain_is_idempotent(tmp_path: Path) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"})
_ep(tmp_path, "01NEW", meta={"code_version": {"commit": "a" * 40}})
qu.drain(tmp_path)
res2 = qu.drain(tmp_path)
# Second pass: only the still-live stamped episode is scanned.
assert res2.scanned == 1
assert res2.kept_stamped == 1
assert res2.quarantined == 0
def test_drain_missing_data_root_is_noop(tmp_path: Path) -> None:
"""First-boot: episodes/ may not exist yet. Drain shouldn't crash."""
res = qu.drain(tmp_path / "does-not-exist")
assert res.scanned == 0
assert res.quarantined == 0
def test_drain_dry_run_moves_nothing(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"})
res = qu.drain(tmp_path, dry_run=True)
assert res.quarantined == 1 # counted as if quarantined
# But the episode is still in episodes/ — nothing actually moved.
assert (tmp_path / "episodes" / "01OLD").exists()
assert not (tmp_path / "quarantine" / "01OLD").exists()
out = capsys.readouterr().out
assert "would-quarantine 01OLD" in out
def test_drain_collision_keeps_quarantine_copy(tmp_path: Path) -> None:
"""Re-running after a previous drain put the same id into
quarantine. Should silently drop the live copy (matches the
queue's _quarantine path semantics)."""
_ep(tmp_path, "01DUP", meta={"host_id": "lab1"})
# Pre-existing quarantine entry from a previous run:
(tmp_path / "quarantine" / "01DUP").mkdir(parents=True)
(tmp_path / "quarantine" / "01DUP" / "meta.json").write_text("{}")
res = qu.drain(tmp_path)
assert res.quarantined == 1
assert not (tmp_path / "episodes" / "01DUP").exists()
assert (tmp_path / "quarantine" / "01DUP" / "meta.json").exists()

View file

@ -320,6 +320,274 @@ def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> Non
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
def test_quarantine_cleanup_removes_old_entries(tmp_path: Path) -> None:
"""Without an upper bound, quarantine/ grows forever. The cleanup
pass walks it once per cleanup_interval and drops anything past
keep_days bounded by directory size since it just statx()s
each entry's mtime.
We run with cleanup_interval_s=0 so the gate fires on every pass,
and overload `os.utime` to age a fixture entry past the cutoff
without sleeping for real time."""
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=7,
quarantine_cleanup_interval_s=0.0, # always run on every pass
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
(old / "meta.json").write_text("{}")
new = cfg.quarantine_dir / "01NEW"
new.mkdir()
(new / "meta.json").write_text("{}")
# Backdate the OLD entry by 8 days. The directory's own mtime
# is what cleanup checks.
eight_days_ago = _time.time() - (8 * 86400)
_os.utime(old, (eight_days_ago, eight_days_ago))
queue._maybe_cleanup_quarantine()
assert not old.exists(), "8-day-old entry should be cleaned up"
assert new.exists(), "fresh entry should survive"
def test_quarantine_cleanup_disabled_when_keep_days_zero(tmp_path: Path) -> None:
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=0, # disabled
quarantine_cleanup_interval_s=0.0,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
eight_days_ago = _time.time() - (8 * 86400)
_os.utime(old, (eight_days_ago, eight_days_ago))
queue._maybe_cleanup_quarantine()
assert old.exists(), "cleanup must be a no-op when keep_days=0"
def test_quarantine_cleanup_respects_interval_gate(tmp_path: Path) -> None:
"""The interval gate prevents the 5s scan tick from statx()-ing
the whole quarantine tree on every pass."""
import os as _os
import time as _time
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
quarantine_keep_days=7,
quarantine_cleanup_interval_s=3600.0,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
# First pass: gate's 0.0 sentinel means we sweep.
queue._maybe_cleanup_quarantine()
first_at = queue._last_quarantine_cleanup_at
assert first_at > 0
# Stage an old entry AFTER the first sweep. The gate should
# block the next sweep until cleanup_interval_s has elapsed.
old = cfg.quarantine_dir / "01OLD"
old.mkdir()
_os.utime(old, (_time.time() - 8 * 86400,) * 2)
queue._maybe_cleanup_quarantine()
assert old.exists(), "gate should defer the next sweep"
assert queue._last_quarantine_cleanup_at == first_at
def test_run_forever_calls_heartbeat(tmp_path: Path) -> None:
"""The heartbeat callback fires once per completed pass. In
production this is wired to sd_notify(WATCHDOG=1) so systemd's
WatchdogSec catches a hung scan loop."""
import threading
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
beats = []
stop = threading.Event()
def _heartbeat() -> None:
beats.append(time.monotonic())
if len(beats) >= 3:
stop.set()
queue.run_forever(stop_check=stop.is_set, heartbeat=_heartbeat)
assert len(beats) >= 3
def test_run_forever_survives_heartbeat_exception(tmp_path: Path) -> None:
"""A broken heartbeat (e.g. NOTIFY_SOCKET vanished) must not take
down the daemon the loss of watchdog is tolerable; the loss
of the ship loop is not."""
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
class _NoTransport:
def ship_tarball(self, *a, **kw): raise AssertionError("not used")
queue = ShipperQueue(cfg, _NoTransport())
pass_count = [0]
def _stop() -> bool:
return pass_count[0] >= 3
def _broken_heartbeat() -> None:
pass_count[0] += 1
raise RuntimeError("simulated NOTIFY_SOCKET failure")
# Should NOT raise.
queue.run_forever(stop_check=_stop, heartbeat=_broken_heartbeat)
assert pass_count[0] >= 3
def test_run_once_sweeps_orphaned_outbox_tarball(tmp_path: Path, receiver) -> None:
"""A tarball in outbox/ with no matching episode dir should get
cleaned up at the start of the next scan. The lifecycle invariant
is `outbox/<id>.tar.zst exists episodes/<id>/ exists`; a
violation means external interference (operator rm-ed the
episode, OS crash, leftover from older buggy code) and we'd
otherwise carry dead bytes forever."""
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
# Stage an orphan: a tarball in outbox/ with no corresponding
# episodes/01ORPHAN/ directory.
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
orphan = cfg.outbox_dir / "01ORPHAN.tar.zst"
orphan.write_bytes(b"\x28\xb5\x2f\xfd") # zstd magic, not a real tarball
# Also a partial — same orphan rule applies.
partial = cfg.outbox_dir / "01PARTIAL.tar.zst.partial"
partial.write_bytes(b"x")
# And a non-orphan: tarball backed by an actual episode dir.
ep = _make_episode(cfg, "01LIVE")
queue.run_once()
assert not orphan.exists(), "orphan tarball must be swept"
assert not partial.exists(), "orphan partial must be swept"
# 01LIVE got shipped+retired in the same pass; both its tarball
# and its episode dir are gone (moved to shipped/).
assert (cfg.shipped_dir / "01LIVE").exists()
def test_run_once_quarantines_fatal_episode(tmp_path: Path) -> None:
"""A 4xx-other-than-409 (e.g. 400 missing-commit) means re-shipping
won't succeed. The shipper must move the episode out of the live
queue so the next scan doesn't burn a PUT on the same dir, AND
drop the outbox tarball so disk doesn't fill up with stale .zst.
Regression: pre-fix queue.py left fatal episodes in episodes/ on
every pass, so 4465+ pre-stamp episodes on k-gamingcom kept
fatal-looping at ~1 PUT/sec for 5+ hours after the receiver gate
went live."""
class _Always400Transport:
"""Stub transport that always rejects with a fatal 400.
Mirrors transport.py's own behaviour for 4xx-not-409."""
def __init__(self) -> None:
self.calls = 0
def ship_tarball(self, episode_id, tarball_path, sha256_hex,
commit=None):
self.calls += 1
from shipper.transport import ShipResult
return ShipResult(
status="fatal",
status_code=400,
sha256=None,
body={"error": "missing X-Cis490-Code-Commit header",
"remediation": "pull and reinstall"},
error="client error 400",
)
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
queue = ShipperQueue(cfg, _Always400Transport())
_make_episode(cfg, "01PRESTAMP")
result = queue.run_once()
assert result.scanned == 1
assert result.fatal == 1
assert result.shipped == 0
# Episode dir is OUT of episodes/ and IN quarantine/.
assert not (cfg.episodes_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP" / "meta.json").exists()
# The reason file carries enough context for triage.
reason = json.loads(
(cfg.quarantine_dir / "01PRESTAMP" / "quarantine_reason.json").read_text()
)
assert reason["status_code"] == 400
assert reason["error"] == "client error 400"
assert reason["body"]["error"] == "missing X-Cis490-Code-Commit header"
assert "quarantined_at_wall" in reason
# Outbox is empty — no stale tarball.
assert list(cfg.outbox_dir.iterdir()) == []
# Critically: a second pass is a no-op. The fix would be useless if
# quarantined episodes leaked back in.
result2 = queue.run_once()
assert result2.scanned == 0
assert result2.fatal == 0
def test_tarball_round_trips_episode_dir(tmp_path: Path, receiver) -> None:
"""The receiver-side tarball must extract back to the original
episode dir layout (modulo file order). Verifies the tar+zstd

View file

@ -248,6 +248,52 @@ def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_pat
assert "missing" in r.json()["error"].lower()
def test_forgejo_falls_back_to_local_git_when_unreachable(repo: Path) -> None:
"""If forgejo is unreachable AND a local repo path is configured,
the gate must fall back to git so the receiver doesn't reject
every PUT during a forgejo blip (e.g. the receiver and forgejo
restarting together on the same Pi). Without the fallback, the
cache stays empty and `check()` returns not-in-window for
everything breaking all lab hosts simultaneously even though
their commits are perfectly valid."""
# Pick a port nothing's bound to. urllib will fail-fast.
import socket as _s
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
dead_port = sock.getsockname()[1]
g = VersionGate(
repo_path=repo,
forgejo_url=f"http://127.0.0.1:{dead_port}",
repo_owner="spectral", repo_name="CIS490", branch="main",
window=50, cache_ttl_s=0,
)
head = _commits(repo)[0]
# Forgejo is unreachable, but the local repo has the commit.
# check() must still return ok.
ok, reason = g.check(head)
assert ok, f"fallback didn't trigger: reason={reason}, head={head}"
assert g.valid_count() >= 1
def test_forgejo_no_fallback_without_local_repo(repo: Path) -> None:
"""Without a local repo configured, a forgejo blip is fatal — the
gate has nothing to fall back to and returns not-in-window. This
confirms the fallback is opt-in via repo_path, not a free side
effect of the forgejo backend."""
import socket as _s
with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
dead_port = sock.getsockname()[1]
g = VersionGate(
forgejo_url=f"http://127.0.0.1:{dead_port}",
repo_owner="spectral", repo_name="CIS490", branch="main",
window=50, cache_ttl_s=0,
)
ok, reason = g.check("a" * 40)
assert not ok and reason == "not-in-window"
assert g.valid_count() == 0
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
"""If the maintainer's clone disappears (or git fails), the gate
keeps its last-known allow-list better than locking out every

View file

@ -202,6 +202,48 @@ def check_install(report: Report, role: str) -> None:
fix=f"sudo /opt/cis490/scripts/install-{role}.sh",
))
# VERSION file — written by install-lab-host.sh on every successful
# run. Its absence means the install never finished step 3, so the
# orchestrator falls back to git rev-parse (or "unknown" if no .git/
# is here either). Stamping "unknown" gets every episode rejected
# by the receiver gate as bad-format → drained to quarantine/. The
# fix is the same git-pull-and-reinstall as for stale code.
version_file = install_root / "VERSION"
if role == "lab-host" and _path_exists(version_file):
try:
v = json.loads(version_file.read_text())
commit = v.get("commit", "")
branch = v.get("branch", "?")
dirty = " [dirty]" if v.get("dirty") else ""
if isinstance(commit, str) and len(commit) == 40:
report.add(Check(
"install: VERSION stamp",
"ok",
detail=f"{branch}@{commit[:8]}{dirty}",
))
else:
report.add(Check(
"install: VERSION stamp",
"fail",
detail=f"commit field malformed: {commit!r}",
fix=f"sudo /opt/cis490/scripts/install-{role}.sh",
))
except (OSError, json.JSONDecodeError) as e:
report.add(Check(
"install: VERSION stamp",
"fail",
detail=f"unreadable: {e}",
fix=f"sudo /opt/cis490/scripts/install-{role}.sh",
))
elif role == "lab-host":
report.add(Check(
"install: VERSION stamp",
"fail",
detail="missing — orchestrator will stamp 'unknown' and the "
"receiver gate will reject every PUT",
fix=f"sudo /opt/cis490/scripts/install-{role}.sh",
))
cfg_name = "lab-host.toml" if role == "lab-host" else "receiver.toml"
cfg = Path("/etc/cis490") / cfg_name
if _path_exists(cfg):
@ -312,6 +354,100 @@ def check_services(report: Report, role: str) -> None:
))
# ---------------------------------------------------------------------------
# checks — recent shipping errors (lab-host)
# ---------------------------------------------------------------------------
def check_recent_shipping_errors(report: Report) -> None:
"""Tail the last 10 minutes of cis490-shipper logs and surface
any 400/412 patterns. The shipper logs every PUT outcome, so a
fresh stream of fatals means the lab host's code is older than
the receiver's allow-list — exactly the loop our gate-cutover
fixes were meant to prevent. Surfacing here gives the operator
a one-line "what's broken" instead of having to grep the journal.
Skipped silently if journalctl isn't accessible (doctor often
runs as the unprivileged user and reading the system journal
needs the systemd-journal group)."""
rc, out, err = _run([
"journalctl", "-u", "cis490-shipper",
"--since", "10 minutes ago", "--no-pager", "--output=cat",
])
if rc != 0:
# Permission denied / journalctl not available / unit not
# installed yet — none of these merit a red row.
report.add(Check(
"shipper: recent log scan",
"skip",
detail=(err.strip().splitlines()[-1] if err else "no output")[:80],
))
return
lines = out.splitlines()
if not lines:
report.add(Check(
"shipper: recent ship results",
"ok",
detail="no output in last 10 minutes (daemon may be idle)",
))
return
# Match what queue.py / app.py actually log. We're conservative:
# only count lines that explicitly identify a ship outcome so
# we don't false-positive on unrelated 400s the receiver might
# log (e.g. health-check probes).
fatal_400 = sum(1 for ln in lines if "missing X-Cis490-Code-Commit" in ln)
fatal_412 = sum(1 for ln in lines if "412 commit-rejected" in ln
or "code commit rejected" in ln)
other_fatal = sum(1 for ln in lines
if "ship " in ln and "fatal" in ln
and "missing X-Cis490-Code-Commit" not in ln
and "commit rejected" not in ln)
transient = sum(1 for ln in lines
if "ship " in ln and "transient" in ln)
if fatal_412 > 0:
report.add(Check(
"shipper: recent ship results",
"fail",
detail=f"{fatal_412} ship(s) rejected as out-of-window in last 10 min",
fix=("cd /opt/cis490 && sudo -u cis490 git pull origin main && "
"sudo /opt/cis490/scripts/install-lab-host.sh "
"# pulls new code + drains stale queue + restarts daemon"),
))
elif fatal_400 > 0:
report.add(Check(
"shipper: recent ship results",
"fail",
detail=(
f"{fatal_400} ship(s) rejected as missing-commit-header — "
"orchestrator is emitting episodes without code_version"
),
fix=("sudo /opt/cis490/scripts/install-lab-host.sh "
"# rewrites VERSION + restarts orchestrator"),
))
elif other_fatal > 0:
report.add(Check(
"shipper: recent ship results",
"warn",
detail=f"{other_fatal} fatal ship(s) in last 10 min (other 4xx)",
fix="sudo journalctl -u cis490-shipper --since '10 minutes ago' "
"| grep -E 'ship .*fatal'",
))
elif transient > 5:
report.add(Check(
"shipper: recent ship results",
"warn",
detail=f"{transient} transient failures in last 10 min — receiver reachable?",
fix="sudo /opt/cis490/.venv/bin/python -m shipper "
"--config /etc/cis490/lab-host.toml --ping",
))
else:
# At least one line of output, but no error patterns matched.
report.add(Check("shipper: recent ship results", "ok"))
# ---------------------------------------------------------------------------
# checks — network (lab-host)
# ---------------------------------------------------------------------------
@ -605,6 +741,7 @@ def main(argv: list[str] | None = None) -> int:
check_bridge(report)
if not args.no_tier3:
check_tier3(report)
check_recent_shipping_errors(report)
check_end_to_end(report)
summary = report.summary()

144
tools/quarantine_unstamped.py Executable file
View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""One-shot drain for pre-stamp episodes stuck in a lab-host's queue.
Scans /var/lib/cis490/data/episodes/ and moves any episode whose
meta.json lacks a 40-char-hex code_version.commit (or has no meta.json
at all) into data/quarantine/<id>/, dropping a quarantine_reason.json
beside it.
Why this exists: the receiver's commit-allow-list went live on
2026-05-01; everything generated by the lab host before that has no
``code_version`` field, so every PUT 400s. The shipper's normal
fatal-quarantine path (queue.py::_quarantine) covers new episodes that
get rejected from here on, but a host with a few thousand pre-stamp
episodes already in episodes/ is going to spend hours just clearing
those before any new (stamped) episode gets shipped. Run this once
per lab host to drain that backlog instantly.
Idempotent. Safe to run while cis490-shipper is active episodes are
moved with rename(2), so the shipper either sees the dir before or
after the move, never partway. If a name collision in quarantine/ does
happen (e.g. a previous run quarantined the same id), the existing
quarantine entry wins and the live copy is removed.
Usage:
sudo -u cis490 /opt/cis490/.venv/bin/python \\
/opt/cis490/tools/quarantine_unstamped.py \\
--data-root /var/lib/cis490/data
"""
from __future__ import annotations
import argparse
import json
import shutil
import sys
import time
from dataclasses import dataclass
from pathlib import Path
HEX40 = set("0123456789abcdef")
def _looks_stamped(meta_path: Path) -> bool:
"""True iff meta.json carries a plausible 40-char-hex commit."""
try:
meta = json.loads(meta_path.read_text())
except (OSError, json.JSONDecodeError):
return False
cv = meta.get("code_version") or {}
commit = cv.get("commit")
if not isinstance(commit, str) or len(commit) != 40:
return False
return all(c in HEX40 for c in commit.lower())
@dataclass
class Result:
scanned: int
quarantined: int
skipped_no_marker: int
kept_stamped: int
errors: int
def drain(data_root: Path, *, dry_run: bool = False) -> Result:
episodes_dir = data_root / "episodes"
quarantine_dir = data_root / "quarantine"
quarantine_dir.mkdir(parents=True, exist_ok=True)
res = Result(0, 0, 0, 0, 0)
if not episodes_dir.exists():
return res
for ep in sorted(episodes_dir.iterdir()):
if not ep.is_dir():
continue
res.scanned += 1
# Only touch episodes the orchestrator finished writing — an
# in-progress dir without done.marker should be left alone so
# the orchestrator can finish it normally.
if not (ep / "done.marker").exists():
res.skipped_no_marker += 1
continue
meta = ep / "meta.json"
if _looks_stamped(meta):
res.kept_stamped += 1
continue
target = quarantine_dir / ep.name
try:
if dry_run:
print(f"would-quarantine {ep.name}")
else:
if target.exists():
shutil.rmtree(ep, ignore_errors=True)
else:
ep.replace(target)
reason = {
"status_code": 400,
"error": "pre-stamp episode (no code_version) drained by quarantine_unstamped.py",
"body": None,
"quarantined_at_wall": time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime()
),
}
try:
(target / "quarantine_reason.json").write_text(
json.dumps(reason)
)
except OSError:
pass
res.quarantined += 1
except OSError as e:
print(f"error: failed to quarantine {ep.name}: {e}", file=sys.stderr)
res.errors += 1
return res
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--data-root",
default="/var/lib/cis490/data",
type=Path,
help="Lab-host data root (contains episodes/, outbox/, etc.).",
)
p.add_argument(
"--dry-run", action="store_true",
help="Print what would be moved without moving anything.",
)
args = p.parse_args()
res = drain(args.data_root, dry_run=args.dry_run)
print(
f"scanned={res.scanned} quarantined={res.quarantined} "
f"kept_stamped={res.kept_stamped} skipped_no_marker={res.skipped_no_marker} "
f"errors={res.errors}"
)
return 1 if res.errors else 0
if __name__ == "__main__":
sys.exit(main())