# Transport — Centralized Episode Collection over WG The dataset lives wherever it is convenient to train from. In our setup that is the Pi5 (or whatever the team designates as the central collector), reachable over the WG overlay at `.wg`. This document describes how episodes get from a lab host to the central collector. ## Design goals 1. **Easy to deploy.** One config file, one systemd unit per side. No DB required to start collecting. 2. **WG-native.** Sender and receiver both live on the WG overlay; transport is just HTTPS over WG. We use the existing wg-pki CA for mTLS. 3. **Idempotent.** Re-shipping the same episode is safe and cheap; the receiver responds 200 if the bytes already match. 4. **Crash-safe.** Lab host crash mid-episode does not corrupt the central store. Receiver crash mid-upload leaves no partial visible. 5. **Schema-free.** The receiver does not parse JSONL; it stores tarballs and an append-only index. The schema lives only at training time. ## What gets shipped A complete episode directory is tarred and zstd-compressed: ``` data/episodes// → .tar.zst ``` The orchestrator marks an episode complete by writing a `done.marker` file at the *end* of the directory after `meta.json` is finalized. The shipper only considers directories that contain `done.marker` — partially-written episodes are invisible to it. ## Wire protocol ``` PUT https://.wg/v1/episodes//.tar.zst Content-Type: application/zstd Content-Length: X-Content-SHA256: X-Schema-Version: 1 X-Lab-Host: X-Episode-Id: body: ``` Auth: mTLS using a leaf certificate issued by the wg-pki CA. The receiver trusts only certs issued by that CA. Responses: | Status | Meaning | |---|---| | 201 | Stored; new | | 200 | Already present with matching sha256; nothing to do | | 409 | Already present with **different** sha256; receiver refuses to overwrite | | 4xx | Bad request (missing header, malformed id, etc.) | | 5xx | Server error; sender retries with backoff | There is no DELETE. Episodes are immutable once shipped. ## Sender (`shipper`) state machine ``` scan data/episodes/ | v for each /done.marker: | v tar+zstd → data/outbox/.tar.zst.partial | v rename → data/outbox/.tar.zst (atomic; visible to retry loop) | v PUT to receiver | +-- 200/201 → mv data/episodes/ data/shipped/; | rm data/outbox/.tar.zst | +-- 409 → log mismatch, leave files in place, alert (manual triage) | +-- 5xx/network → backoff (1s, 2s, 4s, 8s, ... cap 5min); retry ``` The shipper does the same scan on every wake-up, so a crash mid-tar or mid-PUT is harmless — the next pass picks up wherever it left off. ## Receiver state machine ``` PUT body received | v stream into /var/lib/cis490/incoming//.tar.zst.partial | v compute sha256 while streaming | +-- mismatch with header → 400, delete partial | +-- match: | v if final path exists: | +-- existing sha256 == new sha256 → 200, delete partial | +-- existing sha256 != new sha256 → 409, delete partial else: | v atomic rename → /var/lib/cis490/episodes//.tar.zst | v append index.jsonl row | v 201 ``` `index.jsonl` row: ```json { "received_at_wall": "2026-04-28T22:31:43Z", "host_id": "lab-host-1", "episode_id": "01HW9GZJ7K8QF5W3X2Y6N1A4B0", "sha256": "...", "size_bytes": 8412331, "schema_version": 1 } ``` That index is the closest thing to a database we have until we decide on one. A trainer can stream it to know what episodes exist, then untar on demand. ## Why not just rsync? `rsync` works, but: - No schema-version tagging at the protocol layer. - No clean way to enforce "immutable once written". - mTLS via WG-issued certs is more uniform with the rest of the overlay than ssh-key juggling. - A tiny FastAPI receiver is also a natural place to add ingest-time hooks later (e.g. emit a Matrix notification on successful receipt, kick off a training run when N new episodes arrive). We may switch to rsync if the FastAPI receiver becomes a bottleneck. For a class project that is unlikely. ## Operational notes - **Disk on lab host.** The shipper keeps episodes locally in `data/shipped//` until a retention pass prunes them. Default retention: 7 days *or* 80% disk usage, whichever comes first. - **Disk on receiver.** No retention enforced by default — the central store is the dataset. - **Backpressure.** If the receiver is unreachable (WG down, Pi rebooting), the shipper accumulates tarballs in `data/outbox/`. No data is lost. - **Multiple lab hosts.** Each writes under its own `/` prefix. No coordination needed; episode ids are globally unique (ULID).