Lab-host shipper + receiver /v1/ping + install scripts

Implements the deployment loop end-to-end on the CIS490 side:

shipper/
  config.py      ShipperConfig (host_id, paths, receiver endpoint, mTLS)
  transport.py   httpx-based PUT + ping with mTLS + bearer support
  queue.py       scan data/episodes/, tar+zstd via system zstd, ship,
                 retire to data/shipped/. Idempotent across crashes per
                 the state machine in docs/transport.md.
  __main__.py    CLI: --ping (smoke test), --once (one pass), or daemon

receiver/app.py: new POST /v1/ping that requires the same auth as PUT
  /v1/episodes but writes nothing. Used by `cis490-shipper --ping`
  during lab-host bring-up to verify the WG/Caddy/mTLS path before
  shipping any real bytes.

etc/
  cis490-shipper.service       systemd unit for the lab-host shipper
  cis490-orchestrator.service  systemd unit for the lab-host queue
                               (kept disabled by default until queue
                               mode lands)
  lab-host.toml.example        config template

scripts/
  install-lab-host.sh   idempotent installer; verifies prereqs,
                        creates cis490 service user, syncs repo to
                        /opt/cis490, builds venv, drops systemd units
                        and config template
  install-receiver.sh   same, for the receiver role on the central WG
                        node (Pi5 in our setup)

tests/test_shipper.py  11 end-to-end tests against a real Uvicorn
                       server hosting the receiver app. Exercises
                       ping, tar+ship, idempotent re-ship, 409
                       conflict, transient (receiver down), tarball
                       round-trip via system zstd.

AGENTS.md  guidance for AI agents working on this and sibling repos.
           Headline: when you hit an issue you can't fully fix in
           scope, file a Forgejo issue rather than leaving a TODO.

51/51 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
max 2026-04-29 23:41:32 -05:00
parent 613c6fa223
commit 7c9f9582ca
13 changed files with 1365 additions and 0 deletions

91
AGENTS.md Normal file
View file

@ -0,0 +1,91 @@
# AGENTS.md — guidance for AI agents working on this repo
This project is part of the spectral lab (`http://maxgit.wg/spectral/`).
The conventions below also apply to sibling repos (`wg-enroll`,
`wg-pki`, `caddy`, `iptmonads`, `matrix`, `forgejo`, `vault`,
`openclaw-deploy`).
## File an issue when you hit a problem
**When you run into an issue you cannot fully resolve in the current
turn, file it as a Forgejo issue on the relevant repo before moving
on.** Do not silently log a TODO comment, leave a partial workaround,
or assume someone else will remember. The issue tracker is the
durable record.
This applies to:
- a build / test / typecheck failure you can't fix in scope
- a bug you discover but aren't tasked with fixing
- a missing dep, missing config, or env-only failure that blocks E2E
- a design gap you've worked around but want a follow-up to fix
properly
- a scope-out you made (e.g. "deferred Tier 4 sample fetch") that
needs an owner so it doesn't get lost
Don't file an issue when:
- the user is in the conversation and you can just *tell* them
- it's already filed (search first: `GET /api/v1/repos/<owner>/<repo>/issues?state=open&q=<keyword>`)
- it's truly a non-issue (a one-line edit you're about to make this
same turn)
## How to file (Forgejo API)
The local Forgejo at `http://10.100.0.1:3000` accepts API calls with a
token-bearer header:
```sh
curl -s -X POST \
-H "Authorization: token <TOKEN>" \
-H "Content-Type: application/json" \
http://10.100.0.1:3000/api/v1/repos/spectral/<repo>/issues \
-d '{
"title": "<short, action-oriented title>",
"body": "<context, repro, attempted fixes, suggested next step>"
}'
```
The token comes from the user's session — never embed one in code or
commits.
### What a good issue body contains
1. **Context** — one sentence on what was being attempted.
2. **What happened** — the actual error, log line, or unexpected
behavior. Paste exact output.
3. **What was tried** — every workaround you attempted and why it
didn't stick.
4. **Suggested next step** — the smallest change that would resolve
it, if you have a guess. "Unknown" is a fine answer.
5. **Related** — link the commit / PR / file:line where the issue
surfaced.
### What a good title looks like
| Bad | Good |
|---|---|
| `tests broken` | `tests/test_episode.py: race when t_mono_origin_ns is set in run() not __init__` |
| `caddy thing` | `Caddy: client_auth requires absolute path; relative trusted_ca_cert_file silently fails` |
| `fix later` | `shipper: 5xx backoff cap is 5min, doc says 1min — pick one` |
## After filing
- Reference the issue number in the next commit message:
`Refs spectral/<repo>#<n>` or `Closes spectral/<repo>#<n>` if your
current change actually fixes it.
- If the issue is on a different repo than the one you're committing
to, fully qualify: `spectral/wg-pki#3`.
## Other conventions
- **Naming:** never coin USB / device / service names on the user's
behalf. Ask first. Reusing an old name is especially bad.
- **`/etc` configs:** `Read` first, copy second. Never overwrite a
`/etc/...` file from a template without checking what's actually
there.
- **wg-enroll scope:** creation-only. Don't add admin /
service-activation features to it.
- **Don't expand a project's binary name beyond its own boundary:**
`openclaw` is the queue/permissions binary in `openclaw-deploy`.
This repo is `wg-enroll` (or its caller). Don't conflate.

View file

@ -0,0 +1,31 @@
[Unit]
Description=CIS490 lab-host episode orchestrator (queue mode)
Documentation=https://maxgit.wg/spectral/CIS490
# Episodes need KVM and (for Tier 3+) msfrpcd up. msfrpcd is brought
# up out-of-band; this unit only requires the kernel + WG.
After=network-online.target wg-quick@wg0.service
Wants=network-online.target
[Service]
Type=simple
User=cis490
Group=cis490
WorkingDirectory=/opt/cis490
# Queue mode is currently a TODO — the binary takes a job-spec file
# and runs episodes in a loop. Until that lands, this unit stays
# disabled by default; lab-host operators kick off episodes by hand
# via tools/run_*.py and let the shipper pick them up.
ExecStart=/opt/cis490/.venv/bin/python -m orchestrator --queue /var/lib/cis490/data/queue
Restart=on-failure
RestartSec=10
# Hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/cis490
SupplementaryGroups=kvm
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,33 @@
[Unit]
Description=CIS490 lab-host episode shipper
Documentation=https://maxgit.wg/spectral/CIS490
# WG must be up before the shipper can reach the receiver.
After=network-online.target wg-quick@wg0.service
Wants=network-online.target
Requires=wg-quick@wg0.service
[Service]
Type=simple
User=cis490
Group=cis490
WorkingDirectory=/opt/cis490
ExecStart=/opt/cis490/.venv/bin/python -m shipper --config /etc/cis490/lab-host.toml
Restart=on-failure
RestartSec=5
# Hardening
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/cis490
ProtectKernelTunables=true
ProtectKernelModules=true
ProtectControlGroups=true
LockPersonality=true
RestrictNamespaces=true
RestrictRealtime=true
SystemCallArchitectures=native
[Install]
WantedBy=multi-user.target

50
etc/lab-host.toml.example Normal file
View file

@ -0,0 +1,50 @@
# CIS490 lab-host — copy to /etc/cis490/lab-host.toml and edit.
#
# This config drives BOTH the orchestrator (which runs episodes) and
# the shipper (which uploads completed episodes to the central
# receiver over WG).
# Stable identity for this lab host. Used in the receiver path
# (/v1/episodes/<host_id>/...) and in the X-Lab-Host header. Pick
# something short, stable, and DNS-safe — letters, digits, _.- only.
host_id = "REPLACE_ME"
[paths]
data_root = "/var/lib/cis490/data"
samples_store = "/var/lib/cis490/samples/store"
qcow_image = "/var/lib/cis490/vm/images/metasploitable2.qcow2"
[receiver]
# The receiver lives behind Caddy on the WG-side collector host. The
# hostname must resolve over WG (collector.wg in the canonical
# spectral lab). The wg-pki CA must be on every lab-host so the
# Caddy-issued internal cert validates.
url = "https://collector.wg"
ca_bundle = "/etc/cis490/certs/wg-ca.pem"
# mTLS: leaf cert + private key issued by wg-pki for THIS host_id.
# Comment these out to fall back to bearer-token auth during early
# bring-up.
client_cert = "/etc/cis490/certs/lab-host.pem"
client_key = "/etc/cis490/certs/lab-host.key"
# Bearer is optional and only used if mTLS isn't yet configured. When
# both are set, mTLS does the actual authn and the bearer is a
# belt-and-suspenders check.
# bearer_token = "REPLACE_ME_WITH_SECRET"
# Set to false ONLY for local-loopback dev against an unsigned cert.
# verify_tls = true
[shipper]
scan_interval_s = 5.0
request_timeout_s = 60.0
[episode]
baseline_seconds = 30
infected_seconds = 90
dormant_seconds = 60
[retention]
keep_local_for_days = 7
prune_at_disk_pct = 80

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import logging
import secrets
import time
from pathlib import Path
from typing import Awaitable, Callable
@ -17,6 +18,7 @@ log = logging.getLogger("cis490.receiver")
SUFFIX = ".tar.zst"
SCHEMA_VERSION = 1
def _bearer_check(request: Request, expected: str | None) -> Response | None:
@ -40,6 +42,23 @@ def make_app(
async def health(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"})
async def ping(request: Request) -> JSONResponse:
"""Smoke-test endpoint. Verifies that the auth layer and the
WG/Caddy/receiver pipe are alive end-to-end without persisting
anything index.jsonl is untouched. Used by ``cis490-shipper
--ping`` during initial bring-up of a new lab host."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
return JSONResponse(
{
"ok": True,
"host_id": request.headers.get("x-lab-host"),
"t_wall_ns": time.time_ns(),
"schema_version": SCHEMA_VERSION,
}
)
async def put_episode(request: Request) -> JSONResponse:
guard = _bearer_check(request, bearer_token)
if guard is not None:
@ -124,6 +143,7 @@ def make_app(
routes = [
Route("/v1/health", health, methods=["GET"]),
Route("/v1/ping", ping, methods=["POST"]),
Route(
"/v1/episodes/{host_id}/{filename}",
put_episode,

112
scripts/install-lab-host.sh Executable file
View file

@ -0,0 +1,112 @@
#!/usr/bin/env bash
# Install / refresh the CIS490 lab-host role.
#
# Idempotent — safe to re-run after `git pull`. Does NOT enroll the
# host into WireGuard (that's wg-enroll's job, run separately and
# *first*) and does NOT mint TLS certs (that's wg-pki's job).
#
# Steps:
# 1. Verify prereqs (KVM, zstd, qemu, python3.11+, systemd).
# 2. Create the cis490 service user + /var/lib/cis490 layout.
# 3. Sync the repo into /opt/cis490 and build a uv-managed venv.
# 4. Install systemd units from etc/.
# 5. Drop /etc/cis490/lab-host.toml (only on first install).
#
# Operator finishes by:
# - editing /etc/cis490/lab-host.toml (host_id, receiver URL, certs)
# - placing leaf certs at /etc/cis490/certs/{lab-host.pem,key,wg-ca.pem}
# - `systemctl enable --now cis490-shipper`
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
INSTALL_ROOT="${INSTALL_ROOT:-/opt/cis490}"
DATA_ROOT="${DATA_ROOT:-/var/lib/cis490}"
ETC_ROOT="${ETC_ROOT:-/etc/cis490}"
SERVICE_USER="${SERVICE_USER:-cis490}"
log() { printf '[install-lab-host] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
# --- 1. prereqs --------------------------------------------------------
log "checking prereqs"
if [[ $EUID -ne 0 ]]; then
die "must run as root (writes to /opt, /etc, /var/lib, and systemd)"
fi
command -v systemctl >/dev/null || die "systemd not found"
command -v qemu-system-x86_64 >/dev/null || die "qemu-system-x86_64 not on PATH"
command -v zstd >/dev/null || die "zstd not on PATH (apt install zstd)"
[[ -e /dev/kvm ]] || die "/dev/kvm missing — KVM not available"
# uv is preferred (lockfile-driven). Fall back to system pip if absent.
USE_UV=0
if command -v uv >/dev/null; then USE_UV=1; fi
# --- 2. user + layout --------------------------------------------------
log "ensuring service user $SERVICE_USER"
if ! id -u "$SERVICE_USER" >/dev/null 2>&1; then
useradd --system --no-create-home --shell /usr/sbin/nologin \
--home-dir "$INSTALL_ROOT" "$SERVICE_USER"
fi
# kvm group lets the service spawn VMs.
if getent group kvm >/dev/null 2>&1; then
usermod -a -G kvm "$SERVICE_USER" || true
fi
install -d -o root -g root -m 0755 "$ETC_ROOT" "$ETC_ROOT/certs"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 \
"$DATA_ROOT" "$DATA_ROOT/data" \
"$DATA_ROOT/data/episodes" "$DATA_ROOT/data/outbox" \
"$DATA_ROOT/data/shipped" "$DATA_ROOT/data/queue" \
"$DATA_ROOT/samples" "$DATA_ROOT/samples/store" \
"$DATA_ROOT/vm" "$DATA_ROOT/vm/images"
# --- 3. repo + venv ----------------------------------------------------
log "syncing repo into $INSTALL_ROOT"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT"
# We use a clean cp -aT rather than rsync to avoid an extra dep.
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
chown -R "$SERVICE_USER":"$SERVICE_USER" "$INSTALL_ROOT"
log "building venv"
if [[ "$USE_UV" -eq 1 ]]; then
sudo -u "$SERVICE_USER" -- env HOME="$INSTALL_ROOT" \
uv sync --project "$INSTALL_ROOT"
else
sudo -u "$SERVICE_USER" -- python3 -m venv "$INSTALL_ROOT/.venv"
sudo -u "$SERVICE_USER" -- "$INSTALL_ROOT/.venv/bin/pip" install \
--quiet --upgrade pip
sudo -u "$SERVICE_USER" -- "$INSTALL_ROOT/.venv/bin/pip" install \
--quiet starlette 'uvicorn[standard]' httpx msgpack
fi
# --- 4. systemd --------------------------------------------------------
log "installing systemd units"
install -m 0644 "$REPO_ROOT/etc/cis490-shipper.service" \
/etc/systemd/system/cis490-shipper.service
install -m 0644 "$REPO_ROOT/etc/cis490-orchestrator.service" \
/etc/systemd/system/cis490-orchestrator.service
systemctl daemon-reload
# --- 5. config template (only on first install) -----------------------
if [[ ! -f "$ETC_ROOT/lab-host.toml" ]]; then
log "writing $ETC_ROOT/lab-host.toml (template)"
install -m 0640 -o root -g "$SERVICE_USER" \
"$REPO_ROOT/etc/lab-host.toml.example" "$ETC_ROOT/lab-host.toml"
log ""
log "FIRST-INSTALL NEXT STEPS:"
log " 1. Edit $ETC_ROOT/lab-host.toml — set host_id and receiver URL."
log " 2. Place TLS material at:"
log " $ETC_ROOT/certs/wg-ca.pem"
log " $ETC_ROOT/certs/lab-host.pem"
log " $ETC_ROOT/certs/lab-host.key (mode 0600, owner $SERVICE_USER)"
log " 3. Smoke-test the receiver pipe:"
log " sudo -u $SERVICE_USER $INSTALL_ROOT/.venv/bin/python -m shipper \\"
log " --config $ETC_ROOT/lab-host.toml --ping"
log " 4. systemctl enable --now cis490-shipper"
else
log "$ETC_ROOT/lab-host.toml exists; leaving in place"
fi
log "lab-host install complete."

106
scripts/install-receiver.sh Executable file
View file

@ -0,0 +1,106 @@
#!/usr/bin/env bash
# Install / refresh the CIS490 receiver role on the central WG node
# (the Pi5 in our setup). Idempotent — safe to re-run.
#
# Steps:
# 1. Verify prereqs (python3.11+, systemd).
# 2. Create the cis490 service user + /var/lib/cis490 layout.
# 3. Sync the repo into /opt/cis490 and build a venv.
# 4. Install cis490-receiver.service.
# 5. Drop /etc/cis490/receiver.toml on first install.
#
# This script does NOT:
# - configure Caddy. Add a `collector.wg` block to your spectral/caddy
# config to terminate TLS and reverse-proxy to 127.0.0.1:8443.
# - issue server / client certs. wg-pki owns CA + leaf issuance.
# - open firewall ports. iptmonads owns the WG-side ruleset.
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
INSTALL_ROOT="${INSTALL_ROOT:-/opt/cis490}"
DATA_ROOT="${DATA_ROOT:-/var/lib/cis490}"
ETC_ROOT="${ETC_ROOT:-/etc/cis490}"
SERVICE_USER="${SERVICE_USER:-cis490}"
log() { printf '[install-receiver] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
# --- 1. prereqs --------------------------------------------------------
log "checking prereqs"
if [[ $EUID -ne 0 ]]; then
die "must run as root"
fi
command -v systemctl >/dev/null || die "systemd not found"
command -v python3 >/dev/null || die "python3 not on PATH"
PY_VER="$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
if ! python3 -c 'import sys; sys.exit(0 if sys.version_info >= (3,11) else 1)'; then
die "python >=3.11 required, found $PY_VER"
fi
USE_UV=0
if command -v uv >/dev/null; then USE_UV=1; fi
# --- 2. user + layout --------------------------------------------------
log "ensuring service user $SERVICE_USER"
if ! id -u "$SERVICE_USER" >/dev/null 2>&1; then
useradd --system --no-create-home --shell /usr/sbin/nologin \
--home-dir "$INSTALL_ROOT" "$SERVICE_USER"
fi
install -d -o root -g root -m 0755 "$ETC_ROOT" "$ETC_ROOT/certs"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 \
"$DATA_ROOT" "$DATA_ROOT/episodes" "$DATA_ROOT/incoming"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0750 "$DATA_ROOT"
# Pre-create the index file so the first PUT doesn't race on creation.
sudo -u "$SERVICE_USER" -- touch "$DATA_ROOT/index.jsonl"
# --- 3. repo + venv ----------------------------------------------------
log "syncing repo into $INSTALL_ROOT"
install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT"
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
chown -R "$SERVICE_USER":"$SERVICE_USER" "$INSTALL_ROOT"
log "building venv"
if [[ "$USE_UV" -eq 1 ]]; then
sudo -u "$SERVICE_USER" -- env HOME="$INSTALL_ROOT" \
uv sync --project "$INSTALL_ROOT"
else
sudo -u "$SERVICE_USER" -- python3 -m venv "$INSTALL_ROOT/.venv"
sudo -u "$SERVICE_USER" -- "$INSTALL_ROOT/.venv/bin/pip" install \
--quiet --upgrade pip
sudo -u "$SERVICE_USER" -- "$INSTALL_ROOT/.venv/bin/pip" install \
--quiet starlette 'uvicorn[standard]'
fi
# --- 4. systemd --------------------------------------------------------
log "installing systemd unit"
install -m 0644 "$REPO_ROOT/etc/cis490-receiver.service" \
/etc/systemd/system/cis490-receiver.service
systemctl daemon-reload
# --- 5. config template (only on first install) -----------------------
if [[ ! -f "$ETC_ROOT/receiver.toml" ]]; then
log "writing $ETC_ROOT/receiver.toml (template)"
install -m 0640 -o root -g "$SERVICE_USER" \
"$REPO_ROOT/etc/receiver.toml.example" "$ETC_ROOT/receiver.toml"
log ""
log "FIRST-INSTALL NEXT STEPS:"
log " 1. Verify $ETC_ROOT/receiver.toml paths."
log " 2. Add a collector.wg block to your spectral/caddy config."
log " Example:"
log " collector.wg {"
log " tls internal"
log " reverse_proxy 127.0.0.1:8443"
log " }"
log " (mTLS to clients is enforced by the wg-pki CA bundle on"
log " the receiver side once leaf certs are issued.)"
log " 3. Open the WG-side port via iptmonads."
log " 4. systemctl enable --now cis490-receiver"
log " 5. From a lab host: cis490-shipper --ping"
else
log "$ETC_ROOT/receiver.toml exists; leaving in place"
fi
log "receiver install complete."

0
shipper/__init__.py Normal file
View file

106
shipper/__main__.py Normal file
View file

@ -0,0 +1,106 @@
"""``cis490-shipper`` CLI entrypoint.
Modes:
--ping hit /v1/ping; exit 0 if 200/ok, non-zero otherwise.
No tarball flow; index.jsonl on the receiver is untouched.
--once one scan pass over data/episodes/, ship anything done, exit.
(default) long-running daemon; rescans every scan_interval_s.
"""
from __future__ import annotations
import argparse
import json
import logging
import signal
import sys
from pathlib import Path
from .config import ShipperConfig
from .queue import ShipperQueue
from .transport import ShipperTransport
def _setup_logging(level: str) -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="cis490-shipper")
parser.add_argument(
"--config",
default="/etc/cis490/lab-host.toml",
help="Path to lab-host config (TOML)",
)
parser.add_argument(
"--ping",
action="store_true",
help="Hit /v1/ping on the receiver and exit",
)
parser.add_argument(
"--once",
action="store_true",
help="One scan pass, then exit (default is long-running daemon)",
)
parser.add_argument("--log-level", default="INFO")
args = parser.parse_args(argv)
_setup_logging(args.log_level)
log = logging.getLogger("cis490.shipper")
try:
cfg = ShipperConfig.load(args.config)
except (FileNotFoundError, ValueError) as e:
log.error("config error: %s", e)
return 2
transport = ShipperTransport(cfg)
if args.ping:
result = transport.ping()
# Print structured one-liner for CI / test pipelines.
print(json.dumps({
"ok": result.ok,
"status_code": result.status_code,
"host_id": cfg.host_id,
"receiver": cfg.receiver.url,
"body": result.body,
"error": result.error,
}))
return 0 if result.ok else 1
queue = ShipperQueue(cfg, transport)
if args.once:
result = queue.run_once()
log.info(
"scan complete: scanned=%d shipped=%d transient=%d conflicts=%d fatal=%d",
result.scanned, result.shipped, result.transient_failures,
result.conflicts, result.fatal,
)
# Exit code reflects fatal-only; transient failures aren't an error
# because the next pass / pod restart will retry.
return 1 if result.fatal else 0
# Daemon mode
stopping = False
def _stop(signum, frame): # noqa: ARG001
nonlocal stopping
log.info("received signal %s; finishing pass and exiting", signum)
stopping = True
signal.signal(signal.SIGTERM, _stop)
signal.signal(signal.SIGINT, _stop)
log.info(
"shipper starting: host_id=%s data_root=%s receiver=%s",
cfg.host_id, cfg.data_root, cfg.receiver.url,
)
queue.run_forever(stop_check=lambda: stopping)
return 0
if __name__ == "__main__":
sys.exit(main())

91
shipper/config.py Normal file
View file

@ -0,0 +1,91 @@
"""Lab-host shipper config — loaded from /etc/cis490/lab-host.toml."""
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
@dataclass(frozen=True)
class ReceiverEndpoint:
url: str # e.g. "https://collector.wg"
ca_bundle: Path | None = None
client_cert: Path | None = None
client_key: Path | None = None
bearer_token: str | None = None
verify_tls: bool = True
@dataclass(frozen=True)
class ShipperConfig:
host_id: str
data_root: Path # Lab-host data root; episodes/, outbox/, shipped/ live here.
receiver: ReceiverEndpoint
# Daemon mode: how often to scan for new done.marker files.
scan_interval_s: float = 5.0
# PUT timeout per episode. Tarballs are bounded by max_episode_bytes;
# at WG speeds this is well under 60s for a typical episode.
request_timeout_s: float = 60.0
# Backoff schedule on transient (5xx / network) failures, in seconds,
# capped at the last entry. The shipper's scan loop will pick the
# episode up again on the next pass regardless.
backoff_seconds: tuple[float, ...] = (1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 120.0, 300.0)
# Local retention before pruning data/shipped/.
keep_local_for_days: int = 7
@property
def episodes_dir(self) -> Path:
return self.data_root / "episodes"
@property
def outbox_dir(self) -> Path:
return self.data_root / "outbox"
@property
def shipped_dir(self) -> Path:
return self.data_root / "shipped"
@classmethod
def load(cls, path: str | Path) -> "ShipperConfig":
with open(path, "rb") as f:
data = tomllib.load(f)
host_id = data.get("host_id")
if not isinstance(host_id, str) or not host_id:
raise ValueError("lab-host config: host_id (string) required at top level")
paths = data.get("paths", {})
data_root = Path(paths.get("data_root", "/var/lib/cis490/data")).resolve()
rcv = data.get("receiver", {})
url = rcv.get("url")
if not isinstance(url, str) or not url:
raise ValueError("lab-host config: receiver.url required")
receiver = ReceiverEndpoint(
url=url.rstrip("/"),
ca_bundle=_optional_path(rcv.get("ca_bundle")),
client_cert=_optional_path(rcv.get("client_cert")),
client_key=_optional_path(rcv.get("client_key")),
bearer_token=rcv.get("bearer_token"),
verify_tls=bool(rcv.get("verify_tls", True)),
)
retention = data.get("retention", {})
return cls(
host_id=host_id,
data_root=data_root,
receiver=receiver,
scan_interval_s=float(data.get("shipper", {}).get("scan_interval_s", 5.0)),
request_timeout_s=float(data.get("shipper", {}).get("request_timeout_s", 60.0)),
keep_local_for_days=int(retention.get("keep_local_for_days", 7)),
)
def _optional_path(v: object) -> Path | None:
if v in (None, ""):
return None
if isinstance(v, str):
return Path(v).expanduser()
raise TypeError(f"expected path string, got {type(v).__name__}")

195
shipper/queue.py Normal file
View file

@ -0,0 +1,195 @@
"""Shipper episode queue — scan, compress, ship, retire.
State machine, mirroring docs/transport.md:
data/episodes/<id>/done.marker
|
v
tar+zstd data/outbox/<id>.tar.zst.partial
|
v
rename data/outbox/<id>.tar.zst
|
v
PUT to receiver
|
+-- 200/201 mv data/episodes/<id> data/shipped/<id>
| rm data/outbox/<id>.tar.zst
|
+-- 409 leave files in place (the local + remote tarball
| differ; manual triage)
|
+-- 5xx/net leave outbox tarball; retry on next pass
|
+-- 4xx log + skip (caller-side bug, doesn't self-heal)
Idempotent on every pass. A crash mid-tar leaves only a ``.partial``
which the next pass overwrites. A crash mid-PUT leaves the tarball in
``outbox/`` and the next pass re-ships it; the receiver responds 200
on a matching sha256, 409 on a divergent one.
"""
from __future__ import annotations
import logging
import shutil
import subprocess
import tarfile
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from .config import ShipperConfig
from .transport import ShipperTransport, ShipResult, hash_file
log = logging.getLogger("cis490.shipper.queue")
@dataclass(frozen=True)
class PassResult:
scanned: int
shipped: int
transient_failures: int
conflicts: int
fatal: int
class ShipperQueue:
def __init__(self, cfg: ShipperConfig, transport: ShipperTransport) -> None:
self.cfg = cfg
self.transport = transport
cfg.episodes_dir.mkdir(parents=True, exist_ok=True)
cfg.outbox_dir.mkdir(parents=True, exist_ok=True)
cfg.shipped_dir.mkdir(parents=True, exist_ok=True)
# ---- main entry point ---------------------------------------------
def run_once(self) -> PassResult:
"""One scan pass. Returns counts for logging / tests."""
ready = self._ready_episodes()
scanned = len(ready)
shipped = 0
transient = 0
conflicts = 0
fatal = 0
for ep_dir in ready:
episode_id = ep_dir.name
try:
tarball, sha = self._tar_episode(ep_dir)
except Exception:
log.exception("tar failed for %s", episode_id)
transient += 1
continue
res = self.transport.ship_tarball(episode_id, tarball, sha)
log.info(
"ship %s -> %s (%d) %s",
episode_id, res.status, res.status_code, res.error or "",
)
if res.status in ("stored", "already-present"):
self._retire(ep_dir, tarball)
shipped += 1
elif res.status == "conflict":
conflicts += 1
# Keep the tarball + episode dir in place. Operator must
# decide whether to drop our copy or fix the remote one.
elif res.status == "transient":
transient += 1
else: # fatal
fatal += 1
return PassResult(
scanned=scanned,
shipped=shipped,
transient_failures=transient,
conflicts=conflicts,
fatal=fatal,
)
def run_forever(self, *, stop_check=lambda: False) -> None:
while not stop_check():
try:
self.run_once()
except Exception:
log.exception("scan pass crashed; sleeping anyway")
# Coarse sleep: we don't need precise scheduling and we
# don't want a tight loop on errors.
t0 = time.monotonic()
while time.monotonic() - t0 < self.cfg.scan_interval_s:
if stop_check():
return
time.sleep(0.5)
# ---- internals -----------------------------------------------------
def _ready_episodes(self) -> list[Path]:
out: list[Path] = []
if not self.cfg.episodes_dir.exists():
return out
for ep in sorted(self.cfg.episodes_dir.iterdir()):
if ep.is_dir() and (ep / "done.marker").exists():
out.append(ep)
return out
def _tar_episode(self, ep_dir: Path) -> tuple[Path, str]:
"""Tar+zstd the episode dir into outbox. Idempotent — overwrites
any prior partial. Returns ``(tarball_path, sha256_hex)``."""
episode_id = ep_dir.name
outbox = self.cfg.outbox_dir
partial = outbox / f"{episode_id}.tar.zst.partial"
final = outbox / f"{episode_id}.tar.zst"
partial.unlink(missing_ok=True)
# We use the system `zstd` for streaming compression: pipe a
# tar stream into `zstd -T0 -19` to get a deterministic tarball
# without buffering the whole tar in memory or pulling in the
# python-zstandard dependency. Falls back to in-process `zstd`
# via the python wheel if the binary isn't on PATH.
if _which_zstd():
with partial.open("wb") as zout:
proc = subprocess.Popen(
["zstd", "-q", "-T0", "-19", "--stdout"],
stdin=subprocess.PIPE, stdout=zout,
)
assert proc.stdin is not None
with tarfile.open(fileobj=proc.stdin, mode="w|") as tf:
tf.add(ep_dir, arcname=episode_id, recursive=True)
proc.stdin.close()
rc = proc.wait()
if rc != 0:
partial.unlink(missing_ok=True)
raise RuntimeError(f"zstd exited {rc}")
else:
# Fallback: pipe through python's built-in zlib via gzip is
# NOT compatible (we want zstd). Surface the missing binary
# rather than silently producing a non-zstd tarball.
partial.unlink(missing_ok=True)
raise RuntimeError(
"the `zstd` binary is required on the lab host. "
"Install it via your package manager."
)
sha = hash_file(partial)
partial.replace(final)
return final, sha
def _retire(self, ep_dir: Path, tarball: Path) -> None:
"""Move episode dir → shipped/, drop the tarball."""
target = self.cfg.shipped_dir / ep_dir.name
if target.exists():
# Belt-and-suspenders: re-shipping an already-retired
# episode shouldn't happen (the dir was moved), but if it
# does, prefer the existing copy and just clean up.
shutil.rmtree(ep_dir, ignore_errors=True)
else:
ep_dir.replace(target)
tarball.unlink(missing_ok=True)
def _which_zstd() -> bool:
return shutil.which("zstd") is not None

203
shipper/transport.py Normal file
View file

@ -0,0 +1,203 @@
"""HTTP transport for the lab-host shipper.
Two operations against the receiver:
POST /v1/ping smoke test
PUT /v1/episodes/<host>/<episode>.tar.zst episode upload
Auth is mTLS (client cert from wg-pki) when configured. A bearer token
is supported as a stand-in during early bring-up before the cert is
issued; production runs should set both.
The transport returns small dataclasses rather than throwing the
caller (shipper queue) decides whether to retry, move to shipped/, or
alert. This keeps the retry policy in one place.
"""
from __future__ import annotations
import hashlib
import logging
import ssl
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import httpx
from .config import ReceiverEndpoint, ShipperConfig
log = logging.getLogger("cis490.shipper.transport")
SCHEMA_VERSION = 1
@dataclass(frozen=True)
class PingResult:
ok: bool
status_code: int
body: dict[str, Any] | None
error: str | None
@dataclass(frozen=True)
class ShipResult:
status: str # "stored" | "already-present" | "conflict" | "transient" | "fatal"
status_code: int
sha256: str | None
body: dict[str, Any] | None
error: str | None
def _build_ssl_context(rcv: ReceiverEndpoint) -> ssl.SSLContext | bool:
"""Build an SSL context honoring the wg-pki CA bundle + client cert.
Returns True / a bundle path / a context. httpx accepts all three;
we use a context so we can attach the client cert for mTLS."""
if not rcv.url.lower().startswith("https://"):
return False
ctx = ssl.create_default_context(
cafile=str(rcv.ca_bundle) if rcv.ca_bundle else None,
)
if not rcv.verify_tls:
# Dev-only path; production lab-hosts should always pin the
# wg-pki CA. Logged loudly so it doesn't slip through.
log.warning("TLS verification disabled — dev-only configuration")
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if rcv.client_cert and rcv.client_key:
ctx.load_cert_chain(str(rcv.client_cert), str(rcv.client_key))
return ctx
class ShipperTransport:
def __init__(self, cfg: ShipperConfig) -> None:
self.cfg = cfg
self._verify = _build_ssl_context(cfg.receiver)
# ---- ping ----------------------------------------------------------
def ping(self) -> PingResult:
url = f"{self.cfg.receiver.url}/v1/ping"
headers = self._common_headers()
try:
with httpx.Client(verify=self._verify, timeout=self.cfg.request_timeout_s) as c:
r = c.post(url, headers=headers, content=b"")
except httpx.HTTPError as e:
return PingResult(ok=False, status_code=0, body=None, error=str(e))
body: dict[str, Any] | None = None
try:
body = r.json()
except Exception:
pass
if r.status_code == 200 and isinstance(body, dict) and body.get("ok"):
return PingResult(ok=True, status_code=200, body=body, error=None)
return PingResult(
ok=False,
status_code=r.status_code,
body=body,
error=f"unexpected status {r.status_code}",
)
# ---- ship ----------------------------------------------------------
def ship_tarball(
self,
episode_id: str,
tarball_path: Path,
sha256_hex: str,
) -> ShipResult:
url = (
f"{self.cfg.receiver.url}/v1/episodes/"
f"{self.cfg.host_id}/{episode_id}.tar.zst"
)
size = tarball_path.stat().st_size
headers = self._common_headers() | {
"Content-Type": "application/zstd",
"Content-Length": str(size),
"X-Content-SHA256": sha256_hex,
"X-Episode-Id": episode_id,
}
try:
with httpx.Client(verify=self._verify, timeout=self.cfg.request_timeout_s) as c, \
tarball_path.open("rb") as body:
# httpx streams from a file-like object via the `content=` kwarg.
r = c.put(url, headers=headers, content=body)
except httpx.HTTPError as e:
return ShipResult(
status="transient",
status_code=0,
sha256=None,
body=None,
error=str(e),
)
body_json: dict[str, Any] | None = None
try:
body_json = r.json()
except Exception:
pass
if r.status_code == 201:
return ShipResult(
status="stored",
status_code=201,
sha256=sha256_hex,
body=body_json,
error=None,
)
if r.status_code == 200:
return ShipResult(
status="already-present",
status_code=200,
sha256=sha256_hex,
body=body_json,
error=None,
)
if r.status_code == 409:
return ShipResult(
status="conflict",
status_code=409,
sha256=sha256_hex,
body=body_json,
error="receiver already has a different sha256 for this id",
)
if 500 <= r.status_code < 600:
return ShipResult(
status="transient",
status_code=r.status_code,
sha256=None,
body=body_json,
error=f"server error {r.status_code}",
)
# 4xx other than 409: caller-side bug — don't retry.
return ShipResult(
status="fatal",
status_code=r.status_code,
sha256=None,
body=body_json,
error=f"client error {r.status_code}",
)
# ---- helpers -------------------------------------------------------
def _common_headers(self) -> dict[str, str]:
h: dict[str, str] = {
"X-Lab-Host": self.cfg.host_id,
"X-Schema-Version": str(SCHEMA_VERSION),
}
if self.cfg.receiver.bearer_token:
h["Authorization"] = f"Bearer {self.cfg.receiver.bearer_token}"
return h
def hash_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()

327
tests/test_shipper.py Normal file
View file

@ -0,0 +1,327 @@
"""End-to-end shipper tests.
These run a real Uvicorn server bound to 127.0.0.1 on a free port,
hosting the actual receiver Starlette app over an EpisodeStore on a
temp dir. The shipper then talks to that server with its real
`httpx.Client` same code path as production. This catches things
the receiver-side ASGI tests can't (HTTP framing, header handling,
sync httpx behaviour, content-length quirks).
"""
from __future__ import annotations
import json
import socket
import threading
import time
from pathlib import Path
import httpx
import pytest
import uvicorn
from receiver.app import make_app
from receiver.store import EpisodeStore
from shipper.config import ReceiverEndpoint, ShipperConfig
from shipper.queue import ShipperQueue
from shipper.transport import ShipperTransport
# ---------------------------------------------------------------------------
# Live-receiver fixture
# ---------------------------------------------------------------------------
def _free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
class _ServerThread(threading.Thread):
def __init__(self, app, port: int) -> None:
super().__init__(daemon=True)
cfg = uvicorn.Config(
app,
host="127.0.0.1",
port=port,
log_level="error",
lifespan="off",
access_log=False,
)
self.server = uvicorn.Server(cfg)
def run(self) -> None:
self.server.run()
def stop(self) -> None:
self.server.should_exit = True
def _wait_for_port(port: int, timeout_s: float = 5.0) -> None:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
with httpx.Client(timeout=0.5) as c:
r = c.get(f"http://127.0.0.1:{port}/v1/health")
if r.status_code == 200:
return
except httpx.HTTPError:
pass
time.sleep(0.05)
raise TimeoutError(f"receiver on 127.0.0.1:{port} did not come up")
@pytest.fixture
def store(tmp_path: Path) -> EpisodeStore:
return EpisodeStore(
store_root=tmp_path / "rcv-episodes",
incoming_root=tmp_path / "rcv-incoming",
index_path=tmp_path / "rcv-index.jsonl",
)
@pytest.fixture
def receiver(store: EpisodeStore):
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token=None)
port = _free_port()
server = _ServerThread(app, port)
server.start()
try:
_wait_for_port(port)
yield f"http://127.0.0.1:{port}", store
finally:
server.stop()
server.join(timeout=2)
@pytest.fixture
def receiver_with_bearer(store: EpisodeStore):
app = make_app(store=store, max_episode_bytes=10_000_000, bearer_token="s3cret")
port = _free_port()
server = _ServerThread(app, port)
server.start()
try:
_wait_for_port(port)
yield f"http://127.0.0.1:{port}", store
finally:
server.stop()
server.join(timeout=2)
def _make_shipper(
tmp_path: Path,
receiver_url: str,
*,
host_id: str = "lab1",
bearer: str | None = None,
) -> tuple[ShipperConfig, ShipperTransport, ShipperQueue]:
data_root = tmp_path / "lab-data"
cfg = ShipperConfig(
host_id=host_id,
data_root=data_root,
receiver=ReceiverEndpoint(url=receiver_url, bearer_token=bearer),
scan_interval_s=0.05,
)
transport = ShipperTransport(cfg)
queue = ShipperQueue(cfg, transport)
return cfg, transport, queue
def _make_episode(cfg: ShipperConfig, episode_id: str, *, content: bytes = b"data") -> Path:
ep = cfg.episodes_dir / episode_id
ep.mkdir(parents=True, exist_ok=True)
(ep / "meta.json").write_bytes(content)
(ep / "events.jsonl").write_text("{}\n")
(ep / "labels.jsonl").write_text("{}\n")
(ep / "telemetry-proc.jsonl").write_text("{}\n")
(ep / "done.marker").touch()
return ep
# ---------------------------------------------------------------------------
# Ping
# ---------------------------------------------------------------------------
def test_ping_returns_ok_against_running_receiver(tmp_path: Path, receiver) -> None:
url, _ = receiver
_, transport, _ = _make_shipper(tmp_path, url)
res = transport.ping()
assert res.ok is True
assert res.status_code == 200
assert res.body is not None
assert res.body["ok"] is True
assert res.body["host_id"] == "lab1"
assert res.body["schema_version"] == 1
def test_ping_writes_nothing_to_index(tmp_path: Path, receiver) -> None:
url, store = receiver
_, transport, _ = _make_shipper(tmp_path, url)
transport.ping()
transport.ping()
transport.ping()
assert store.index_path.read_text() == ""
def test_ping_fails_with_wrong_bearer(tmp_path: Path, receiver_with_bearer) -> None:
url, _ = receiver_with_bearer
_, transport, _ = _make_shipper(tmp_path, url, bearer="WRONG")
res = transport.ping()
assert res.ok is False
assert res.status_code == 401
def test_ping_succeeds_with_right_bearer(tmp_path: Path, receiver_with_bearer) -> None:
url, _ = receiver_with_bearer
_, transport, _ = _make_shipper(tmp_path, url, bearer="s3cret")
res = transport.ping()
assert res.ok is True
assert res.status_code == 200
def test_ping_fails_when_receiver_unreachable(tmp_path: Path) -> None:
# Pick a free port and don't bind it — connect must fail.
port = _free_port()
_, transport, _ = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
res = transport.ping()
assert res.ok is False
assert res.status_code == 0
assert res.error is not None
# ---------------------------------------------------------------------------
# Tar + ship
# ---------------------------------------------------------------------------
def test_run_once_ships_one_done_episode(tmp_path: Path, receiver) -> None:
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01EPISODE")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 1
assert result.transient_failures == 0
# Episode dir moved to shipped/.
assert not (cfg.episodes_dir / "01EPISODE").exists()
assert (cfg.shipped_dir / "01EPISODE").exists()
# Outbox tarball cleaned up.
assert list(cfg.outbox_dir.iterdir()) == []
# Receiver stored it and indexed it.
assert store.final_path("lab1", "01EPISODE").exists()
rows = [json.loads(l) for l in store.index_path.read_text().splitlines()]
assert len(rows) == 1
assert rows[0]["host_id"] == "lab1"
assert rows[0]["episode_id"] == "01EPISODE"
def test_run_once_skips_episodes_without_done_marker(tmp_path: Path, receiver) -> None:
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
ep = cfg.episodes_dir / "01PARTIAL"
ep.mkdir(parents=True)
(ep / "meta.json").write_text("{}")
# Note: NO done.marker.
result = queue.run_once()
assert result.scanned == 0
assert result.shipped == 0
assert ep.exists() # untouched
assert store.index_path.read_text() == ""
def test_run_once_idempotent_re_ship_returns_already_present(tmp_path: Path, receiver) -> None:
"""If a prior run shipped an episode but crashed before retiring it,
the next run must re-ship the same bytes successfully (200) and
retire the dir, not flag it as a conflict."""
url, store = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01REPLAY", content=b"same-bytes")
queue.run_once()
assert (cfg.shipped_dir / "01REPLAY").exists()
# Simulate a crash: move it back as if retire never happened.
(cfg.shipped_dir / "01REPLAY").rename(cfg.episodes_dir / "01REPLAY")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 1
assert (cfg.shipped_dir / "01REPLAY").exists()
# Index didn't double up.
rows = store.index_path.read_text().splitlines()
assert len(rows) == 1
def test_run_once_handles_409_conflict(tmp_path: Path, receiver) -> None:
"""If the same episode_id was previously shipped with *different*
bytes, the receiver returns 409 and the shipper must NOT retire
the local dir operator triage required."""
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
_make_episode(cfg, "01CONFLICT", content=b"first")
result = queue.run_once()
assert result.shipped == 1
# Simulate a re-do with different content but the same id (e.g., a
# botched re-run on the lab host).
(cfg.shipped_dir / "01CONFLICT").rename(cfg.episodes_dir / "01CONFLICT")
(cfg.episodes_dir / "01CONFLICT" / "meta.json").write_bytes(b"tampered")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 0
assert result.conflicts == 1
# Local dir survives — operator can decide what to do.
assert (cfg.episodes_dir / "01CONFLICT").exists()
def test_run_once_handles_transient_when_receiver_is_down(tmp_path: Path) -> None:
port = _free_port()
cfg, _, queue = _make_shipper(tmp_path, f"http://127.0.0.1:{port}")
_make_episode(cfg, "01DOWN")
result = queue.run_once()
assert result.scanned == 1
assert result.shipped == 0
assert result.transient_failures == 1
# Episode dir + tarball both stay in place for the next pass.
assert (cfg.episodes_dir / "01DOWN").exists()
assert (cfg.outbox_dir / "01DOWN.tar.zst").exists()
def test_tarball_round_trips_episode_dir(tmp_path: Path, receiver) -> None:
"""The receiver-side tarball must extract back to the original
episode dir layout (modulo file order). Verifies the tar+zstd
pipe is intact."""
import subprocess
import tarfile
url, _ = receiver
cfg, _, queue = _make_shipper(tmp_path, url)
ep = _make_episode(cfg, "01ROUND", content=b"meta-bytes")
expected_files = sorted(p.name for p in ep.iterdir())
queue.run_once()
# The receiver stored it; pull the bytes back, decompress + untar.
rcv_path = next((tmp_path / "rcv-episodes" / "lab1").glob("01ROUND.tar.zst"))
decompressed = tmp_path / "01ROUND.tar"
subprocess.check_call(
["zstd", "-q", "-d", "-o", str(decompressed), str(rcv_path)],
)
extract_dir = tmp_path / "extracted"
extract_dir.mkdir()
with tarfile.open(decompressed) as tf:
tf.extractall(extract_dir)
got_files = sorted(p.name for p in (extract_dir / "01ROUND").iterdir())
assert got_files == expected_files