fix: lab-host install loop after commit-gate cutover

Why services weren't starting after the gate went live:

1. install-lab-host.sh self-copy. The receiver's 400 remediation tells
   the agent to `cd /opt/cis490 && git pull && sudo
   ./scripts/install-lab-host.sh`. That makes REPO_ROOT==INSTALL_ROOT
   and `cp -aT $REPO_ROOT $INSTALL_ROOT` errors with "are the same
   file"; `set -e` aborts before the systemd units install or anything
   restarts. Detect the same-dir case and skip the cp; chown still
   runs.

2. Services never restart. install-lab-host.sh and install-tier-3-4.sh
   both ended by *telling the operator* to restart, then exiting. The
   running shipper/orchestrator kept executing pre-gate code from the
   old module objects, so new `code_version` stamping never reached an
   episode. Both scripts now `systemctl restart` the units they own
   when those units are enabled.

3. Shipper queue fatal-loop. queue.py incremented `fatal++` but didn't
   move the episode out of `data/episodes/`. Next scan re-tarred and
   re-PUT the same dir, getting 400 again. With 4465+ pre-stamp
   episodes on k-gamingcom this burned ~1 PUT/sec for 5+ hours of
   receiver log. Fatal episodes now move to data/quarantine/<id>/ with
   a quarantine_reason.json beside them; the outbox tarball is
   deleted.

4. Pre-stamp backlog drain. tools/quarantine_unstamped.py is a
   one-shot that scans data/episodes/ and quarantines anything without
   a 40-char-hex code_version.commit. Wired into install-lab-host.sh
   step 9 so a re-install drains the queue automatically. Idempotent;
   safe to run while the shipper is active.

Tests cover the queue's new fatal-quarantine path and every drain
behaviour (kept/quarantined/dry-run/idempotent/missing-meta/collision).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
max 2026-05-01 11:36:21 -05:00
parent e2bb76144f
commit eda6164897
7 changed files with 450 additions and 5 deletions

View file

@ -65,8 +65,18 @@ install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 \
# --- 3. repo + venv ----------------------------------------------------
log "syncing repo into $INSTALL_ROOT"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT"
# We use a clean cp -aT rather than rsync to avoid an extra dep.
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
# The expected lab-host upgrade path is `cd /opt/cis490 && git pull &&
# sudo ./scripts/install-lab-host.sh`, which means REPO_ROOT and
# INSTALL_ROOT can be the same directory. cp -aT errors out in that
# case ("are the same file") and `set -e` aborts before the systemd
# units get installed and services restart, leaving the host running
# whatever code was loaded at last service start.
if [[ "$(readlink -f "$REPO_ROOT")" == "$(readlink -f "$INSTALL_ROOT")" ]]; then
log "REPO_ROOT == INSTALL_ROOT ($INSTALL_ROOT); git pull already updated tree, skipping cp"
else
# Clean cp -aT to avoid an extra rsync dep.
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
fi
chown -R "$SERVICE_USER":"$SERVICE_USER" "$INSTALL_ROOT"
# Stamp a VERSION file at install time so episodes can record the
@ -281,6 +291,35 @@ if [[ "$NEW_INSTALL" == "1" ]]; then
log "================================================================="
fi
# --- 9. drain pre-stamp queue + restart services -----------------------
# On a re-run (not first install), the running services are still
# executing whatever code they loaded at last start, so the new module
# objects we just dropped on disk don't take effect until restart. And
# any episodes the orchestrator generated against pre-stamping code
# are missing meta.json::code_version — the receiver returns 400 for
# every PUT, the queue's fatal path didn't quarantine them in older
# code, and the shipper burns cycles re-tarring them on every pass.
# Drain them once, then restart so the new code reaches the live
# daemon.
if [[ "$NEW_INSTALL" != "1" ]]; then
PY="$INSTALL_ROOT/.venv/bin/python"
if [[ -x "$PY" && -f "$INSTALL_ROOT/tools/quarantine_unstamped.py" ]]; then
log "draining pre-stamp episodes from queue (idempotent)"
sudo -u "$SERVICE_USER" -- "$PY" \
"$INSTALL_ROOT/tools/quarantine_unstamped.py" \
--data-root "$DATA_ROOT/data" || \
log "WARN: quarantine drain returned non-zero — see output above"
fi
systemctl daemon-reload
for svc in cis490-shipper cis490-orchestrator; do
if systemctl is-enabled --quiet "$svc" 2>/dev/null; then
log "restarting $svc to pick up new code"
systemctl restart "$svc" || \
log "WARN: $svc restart failed — check 'journalctl -u $svc'"
fi
done
fi
log "lab-host install complete."
log ""
log "Cloning this repo and running the launchers manually is NOT enough."

View file

@ -152,6 +152,15 @@ else
log "Tier-4 ✓ ($REAL_COUNT real binaries staged in $INSTALL_ROOT/samples/store/)"
fi
# Restart the orchestrator now so the next wave actually runs Tier-3
# against the freshly-staged msfrpcd + samples. Skipped only if the
# unit isn't enabled yet (first install hasn't run `systemctl enable`).
if systemctl is-enabled --quiet cis490-orchestrator 2>/dev/null; then
log "restarting cis490-orchestrator to pick up new modules + samples"
systemctl restart cis490-orchestrator || \
log "WARN: orchestrator restart failed — check 'journalctl -u cis490-orchestrator'"
fi
log ""
log "================================================================="
log " Tier-3 deploy complete on $(hostname)"
@ -160,7 +169,4 @@ log " - metasploit-framework + cis490-msfrpcd.service active"
log " - $OUT_DIR/metasploitable2.qcow2 staged"
log " - bridge: $(ip link show br-malware >/dev/null 2>&1 && echo up || echo skipped)"
log " - Tier-4: $(ls "$INSTALL_ROOT/samples/store/" 2>/dev/null | wc -l) real binaries staged"
log ""
log " Restart the orchestrator so the next wave runs Tier-3:"
log " sudo systemctl restart cis490-orchestrator"
log "================================================================="

View file

@ -46,6 +46,14 @@ class ShipperConfig:
def shipped_dir(self) -> Path:
return self.data_root / "shipped"
@property
def quarantine_dir(self) -> Path:
# Episodes the receiver has refused permanently (4xx other than
# 409 — typically 400 missing-commit or 412 not-in-window). They
# don't belong in shipped/ (we have nothing to compare against)
# and re-shipping them would just re-burn the queue.
return self.data_root / "quarantine"
@classmethod
def load(cls, path: str | Path) -> "ShipperConfig":
with open(path, "rb") as f:

View file

@ -64,6 +64,7 @@ class ShipperQueue:
cfg.episodes_dir.mkdir(parents=True, exist_ok=True)
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
cfg.shipped_dir.mkdir(parents=True, exist_ok=True)
cfg.quarantine_dir.mkdir(parents=True, exist_ok=True)
# ---- main entry point ---------------------------------------------
@ -115,6 +116,11 @@ class ShipperQueue:
elif res.status == "transient":
transient += 1
else: # fatal
# Move the episode out of the live queue so the next
# scan doesn't re-tar/re-PUT the same dir forever. The
# tarball gets deleted; meta.json + telemetry survive
# in quarantine/ for operator triage.
self._quarantine(ep_dir, tarball, res)
fatal += 1
return PassResult(
@ -223,6 +229,30 @@ class ShipperQueue:
ep_dir.replace(target)
tarball.unlink(missing_ok=True)
def _quarantine(self, ep_dir: Path, tarball: Path, res: ShipResult) -> None:
"""Move a permanently-rejected episode out of the live queue.
Drops a small ``quarantine_reason.json`` next to the episode so
an operator (or a future backfill tool) can see why without
having to dig through journalctl. The tarball in outbox/ goes
away re-shipping won't help and it just burns disk."""
target = self.cfg.quarantine_dir / ep_dir.name
if target.exists():
shutil.rmtree(ep_dir, ignore_errors=True)
else:
ep_dir.replace(target)
reason = {
"status_code": res.status_code,
"error": res.error,
"body": res.body,
"quarantined_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
try:
(target / "quarantine_reason.json").write_text(json.dumps(reason))
except OSError:
log.exception("quarantine reason write failed for %s", ep_dir.name)
tarball.unlink(missing_ok=True)
def _which_zstd() -> bool:
return shutil.which("zstd") is not None

View file

@ -0,0 +1,149 @@
"""Tests for tools/quarantine_unstamped.py.
This is the one-shot drain that we run on each lab host once after the
commit-gate goes live. The behaviour we care about:
- episodes WITH a 40-char-hex code_version.commit stay put
- episodes WITHOUT that field move to quarantine/
- episodes lacking done.marker (still being written) are untouched
- quarantine/<id>/quarantine_reason.json gets dropped beside it
- re-running is a no-op (idempotent pre-stamp episodes are gone,
valid ones aren't touched)
"""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"quarantine_unstamped", REPO_ROOT / "tools" / "quarantine_unstamped.py"
)
qu = importlib.util.module_from_spec(spec)
sys.modules["quarantine_unstamped"] = qu
spec.loader.exec_module(qu)
def _ep(root: Path, name: str, *, meta: dict | None, done: bool = True) -> Path:
"""Stage a fake episode under <root>/episodes/<name>/."""
d = root / "episodes" / name
d.mkdir(parents=True)
if meta is not None:
(d / "meta.json").write_text(json.dumps(meta))
if done:
(d / "done.marker").touch()
return d
def test_drain_moves_unstamped_to_quarantine(tmp_path: Path) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"}) # no code_version
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.quarantined == 1
assert res.kept_stamped == 0
assert not (tmp_path / "episodes" / "01OLD").exists()
assert (tmp_path / "quarantine" / "01OLD" / "meta.json").exists()
reason = json.loads(
(tmp_path / "quarantine" / "01OLD" / "quarantine_reason.json").read_text()
)
assert reason["status_code"] == 400
assert "pre-stamp" in reason["error"]
def test_drain_keeps_stamped_episode(tmp_path: Path) -> None:
"""A stamped episode (40-char-hex commit) belongs in the live
queue the shipper will succeed against the receiver gate."""
_ep(tmp_path, "01NEW", meta={
"code_version": {"commit": "a" * 40, "branch": "main", "dirty": False},
})
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.quarantined == 0
assert res.kept_stamped == 1
assert (tmp_path / "episodes" / "01NEW").exists()
assert not (tmp_path / "quarantine" / "01NEW").exists()
def test_drain_rejects_short_commit(tmp_path: Path) -> None:
"""A truncated/garbled commit is not accepted as 'stamped'
receiver would 400 it as bad-format anyway."""
_ep(tmp_path, "01SHORT", meta={"code_version": {"commit": "abc123"}})
res = qu.drain(tmp_path)
assert res.quarantined == 1
def test_drain_rejects_non_hex_commit(tmp_path: Path) -> None:
bad = "z" * 40
_ep(tmp_path, "01BADHEX", meta={"code_version": {"commit": bad}})
res = qu.drain(tmp_path)
assert res.quarantined == 1
def test_drain_skips_in_progress_episode(tmp_path: Path) -> None:
"""No done.marker means the orchestrator is still writing to the
dir leave it alone, drainer is for 'finished and queued' only."""
_ep(tmp_path, "01PARTIAL", meta=None, done=False)
res = qu.drain(tmp_path)
assert res.scanned == 1
assert res.skipped_no_marker == 1
assert res.quarantined == 0
assert (tmp_path / "episodes" / "01PARTIAL").exists()
def test_drain_handles_missing_meta_json(tmp_path: Path) -> None:
"""A done episode with no meta.json is corrupt — should be
quarantined, not kept (it'd fail the gate too)."""
_ep(tmp_path, "01NOMETA", meta=None, done=True)
res = qu.drain(tmp_path)
assert res.quarantined == 1
assert (tmp_path / "quarantine" / "01NOMETA" / "quarantine_reason.json").exists()
def test_drain_is_idempotent(tmp_path: Path) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"})
_ep(tmp_path, "01NEW", meta={"code_version": {"commit": "a" * 40}})
qu.drain(tmp_path)
res2 = qu.drain(tmp_path)
# Second pass: only the still-live stamped episode is scanned.
assert res2.scanned == 1
assert res2.kept_stamped == 1
assert res2.quarantined == 0
def test_drain_missing_data_root_is_noop(tmp_path: Path) -> None:
"""First-boot: episodes/ may not exist yet. Drain shouldn't crash."""
res = qu.drain(tmp_path / "does-not-exist")
assert res.scanned == 0
assert res.quarantined == 0
def test_drain_dry_run_moves_nothing(tmp_path: Path, capsys: pytest.CaptureFixture) -> None:
_ep(tmp_path, "01OLD", meta={"host_id": "lab1"})
res = qu.drain(tmp_path, dry_run=True)
assert res.quarantined == 1 # counted as if quarantined
# But the episode is still in episodes/ — nothing actually moved.
assert (tmp_path / "episodes" / "01OLD").exists()
assert not (tmp_path / "quarantine" / "01OLD").exists()
out = capsys.readouterr().out
assert "would-quarantine 01OLD" in out
def test_drain_collision_keeps_quarantine_copy(tmp_path: Path) -> None:
"""Re-running after a previous drain put the same id into
quarantine. Should silently drop the live copy (matches the
queue's _quarantine path semantics)."""
_ep(tmp_path, "01DUP", meta={"host_id": "lab1"})
# Pre-existing quarantine entry from a previous run:
(tmp_path / "quarantine" / "01DUP").mkdir(parents=True)
(tmp_path / "quarantine" / "01DUP" / "meta.json").write_text("{}")
res = qu.drain(tmp_path)
assert res.quarantined == 1
assert not (tmp_path / "episodes" / "01DUP").exists()
assert (tmp_path / "quarantine" / "01DUP" / "meta.json").exists()

View file

@ -320,6 +320,75 @@ def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> Non
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
def test_run_once_quarantines_fatal_episode(tmp_path: Path) -> None:
"""A 4xx-other-than-409 (e.g. 400 missing-commit) means re-shipping
won't succeed. The shipper must move the episode out of the live
queue so the next scan doesn't burn a PUT on the same dir, AND
drop the outbox tarball so disk doesn't fill up with stale .zst.
Regression: pre-fix queue.py left fatal episodes in episodes/ on
every pass, so 4465+ pre-stamp episodes on k-gamingcom kept
fatal-looping at ~1 PUT/sec for 5+ hours after the receiver gate
went live."""
class _Always400Transport:
"""Stub transport that always rejects with a fatal 400.
Mirrors transport.py's own behaviour for 4xx-not-409."""
def __init__(self) -> None:
self.calls = 0
def ship_tarball(self, episode_id, tarball_path, sha256_hex,
commit=None):
self.calls += 1
from shipper.transport import ShipResult
return ShipResult(
status="fatal",
status_code=400,
sha256=None,
body={"error": "missing X-Cis490-Code-Commit header",
"remediation": "pull and reinstall"},
error="client error 400",
)
cfg = ShipperConfig(
host_id="lab1",
data_root=tmp_path / "lab-data",
receiver=ReceiverEndpoint(url="http://127.0.0.1:1"),
scan_interval_s=0.05,
)
queue = ShipperQueue(cfg, _Always400Transport())
_make_episode(cfg, "01PRESTAMP")
result = queue.run_once()
assert result.scanned == 1
assert result.fatal == 1
assert result.shipped == 0
# Episode dir is OUT of episodes/ and IN quarantine/.
assert not (cfg.episodes_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP").exists()
assert (cfg.quarantine_dir / "01PRESTAMP" / "meta.json").exists()
# The reason file carries enough context for triage.
reason = json.loads(
(cfg.quarantine_dir / "01PRESTAMP" / "quarantine_reason.json").read_text()
)
assert reason["status_code"] == 400
assert reason["error"] == "client error 400"
assert reason["body"]["error"] == "missing X-Cis490-Code-Commit header"
assert "quarantined_at_wall" in reason
# Outbox is empty — no stale tarball.
assert list(cfg.outbox_dir.iterdir()) == []
# Critically: a second pass is a no-op. The fix would be useless if
# quarantined episodes leaked back in.
result2 = queue.run_once()
assert result2.scanned == 0
assert result2.fatal == 0
def test_tarball_round_trips_episode_dir(tmp_path: Path, receiver) -> None:
"""The receiver-side tarball must extract back to the original
episode dir layout (modulo file order). Verifies the tar+zstd

144
tools/quarantine_unstamped.py Executable file
View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""One-shot drain for pre-stamp episodes stuck in a lab-host's queue.
Scans /var/lib/cis490/data/episodes/ and moves any episode whose
meta.json lacks a 40-char-hex code_version.commit (or has no meta.json
at all) into data/quarantine/<id>/, dropping a quarantine_reason.json
beside it.
Why this exists: the receiver's commit-allow-list went live on
2026-05-01; everything generated by the lab host before that has no
``code_version`` field, so every PUT 400s. The shipper's normal
fatal-quarantine path (queue.py::_quarantine) covers new episodes that
get rejected from here on, but a host with a few thousand pre-stamp
episodes already in episodes/ is going to spend hours just clearing
those before any new (stamped) episode gets shipped. Run this once
per lab host to drain that backlog instantly.
Idempotent. Safe to run while cis490-shipper is active episodes are
moved with rename(2), so the shipper either sees the dir before or
after the move, never partway. If a name collision in quarantine/ does
happen (e.g. a previous run quarantined the same id), the existing
quarantine entry wins and the live copy is removed.
Usage:
sudo -u cis490 /opt/cis490/.venv/bin/python \\
/opt/cis490/tools/quarantine_unstamped.py \\
--data-root /var/lib/cis490/data
"""
from __future__ import annotations
import argparse
import json
import shutil
import sys
import time
from dataclasses import dataclass
from pathlib import Path
HEX40 = set("0123456789abcdef")
def _looks_stamped(meta_path: Path) -> bool:
"""True iff meta.json carries a plausible 40-char-hex commit."""
try:
meta = json.loads(meta_path.read_text())
except (OSError, json.JSONDecodeError):
return False
cv = meta.get("code_version") or {}
commit = cv.get("commit")
if not isinstance(commit, str) or len(commit) != 40:
return False
return all(c in HEX40 for c in commit.lower())
@dataclass
class Result:
scanned: int
quarantined: int
skipped_no_marker: int
kept_stamped: int
errors: int
def drain(data_root: Path, *, dry_run: bool = False) -> Result:
episodes_dir = data_root / "episodes"
quarantine_dir = data_root / "quarantine"
quarantine_dir.mkdir(parents=True, exist_ok=True)
res = Result(0, 0, 0, 0, 0)
if not episodes_dir.exists():
return res
for ep in sorted(episodes_dir.iterdir()):
if not ep.is_dir():
continue
res.scanned += 1
# Only touch episodes the orchestrator finished writing — an
# in-progress dir without done.marker should be left alone so
# the orchestrator can finish it normally.
if not (ep / "done.marker").exists():
res.skipped_no_marker += 1
continue
meta = ep / "meta.json"
if _looks_stamped(meta):
res.kept_stamped += 1
continue
target = quarantine_dir / ep.name
try:
if dry_run:
print(f"would-quarantine {ep.name}")
else:
if target.exists():
shutil.rmtree(ep, ignore_errors=True)
else:
ep.replace(target)
reason = {
"status_code": 400,
"error": "pre-stamp episode (no code_version) drained by quarantine_unstamped.py",
"body": None,
"quarantined_at_wall": time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime()
),
}
try:
(target / "quarantine_reason.json").write_text(
json.dumps(reason)
)
except OSError:
pass
res.quarantined += 1
except OSError as e:
print(f"error: failed to quarantine {ep.name}: {e}", file=sys.stderr)
res.errors += 1
return res
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--data-root",
default="/var/lib/cis490/data",
type=Path,
help="Lab-host data root (contains episodes/, outbox/, etc.).",
)
p.add_argument(
"--dry-run", action="store_true",
help="Print what would be moved without moving anything.",
)
args = p.parse_args()
res = drain(args.data_root, dry_run=args.dry_run)
print(
f"scanned={res.scanned} quarantined={res.quarantined} "
f"kept_stamped={res.kept_stamped} skipped_no_marker={res.skipped_no_marker} "
f"errors={res.errors}"
)
return 1 if res.errors else 0
if __name__ == "__main__":
sys.exit(main())