CIS490/receiver/app.py
Max Gorog 4ab5477226 PIPELINE §5 step 1: fix four root-cause defects
Diagnoses + fixes for the silent-collector / never-lands-session
failures that the 200-episode quality probe surfaced (§3 evidence).
All four address the producer; no compensating layers added.

perf collector (rows_perf=0 on 100% of episodes):
  - perf stat -j writes to stderr by default with -p; we read stdout.
    Add --log-fd 1 so JSON reaches stdout where the parser sees it.
  - Event names come back annotated with the privilege scope perf
    actually measured ("cycles:u" under perf_event_paranoid=2). Strip
    the suffix so _build_row's plain-name lookups hit. Without this
    every metric was None even when perf reported real numbers.
  - tests/test_collectors_emit.py covers the regression with a real
    busy-loop fixture; emit-test discipline per §4.4.

guest-agent collector (rows_guest=0 on 100% of episodes):
  - Alpine cloud image doesn't ship python3, so the in-guest agent's
    `#!/usr/bin/env python3` shebang silently fails. Add packages:
    [python3] to cidata user-data so cloud-init installs it before
    the OpenRC service starts.
  - Guest agent now exits nonzero (was: silent stdout fallback) when
    /dev/virtio-ports/cis490.guest.agent is missing, so OpenRC
    reports the failure to /var/log/cis490-agent.log instead of the
    bytes vanishing into the void. Refs §1.
  - Host-side collector emits guest_agent_connected /
    guest_agent_first_byte / guest_agent_silent_window into the
    orchestrator's events.jsonl. Future episodes show the in-guest
    failure mode per-episode instead of inferring from rows_guest=0.

k-gamingcom missing qmp/netflow/pcap (also affected elliott on
  Tier-3 episodes — was misclassified as host divergence):
  - tools/run_tier3_demo.py was building EpisodeConfig WITHOUT
    qmp_socket / guest_agent_socket / bridge_iface — even though
    launch_target.sh creates the underlying chardevs and BRIDGE
    supplies the iface. tools/run_real_vm_demo.py wires them
    correctly; Tier-3 had a copy-paste gap.
  - tests/test_collectors_emit.py adds a source-grep regression so
    the wiring stays honest.

samba_usermap_script never lands session (0/67 in §3 probe):
  - Bind handler default WfsDelay (~5s) gives up before bind_perl on
    Metasploitable2 has finished forking + binding LPORT under
    SLIRP+hostfwd. Bump to 30s; matches session_open_timeout_s in
    exploits/driver.py so framework + driver agree on the wait
    budget. Add ConnectTimeout=15 so the handler's bind connect has
    retry budget instead of one-shot.

orchestrator/fleet.py: usable_modules + BRIDGE handling were both
  unconditional, so:
  - With BRIDGE set, requires_bridge modules were still being
    dropped — picker only ever returned samba_usermap_script across
    every slot/episode (the test_fleet_uses_all_modules_when_bridge_set
    failure on HEAD).
  - env.pop("BRIDGE") fired even when BRIDGE was the operator's
    explicit setup, breaking modules that need bridge mode (vsftpd
    backdoor on hardcoded port 6200, distccd, etc.).
  Both made conditional on bridge_set so the picker walks the full
  catalog under bridge mode and SLIRP-only modules still get a
  clean SLIRP env when BRIDGE is unset.

receiver/app.py: half-pregnant v2 schema state in HEAD — calling
  store.ingest_stream(episode_type=..., benign_profile=...) with
  kwargs the matching store.py change was in the WIP stash. Removed
  v2 awareness from app.py so v1 episodes (what the producer ships
  today) get accepted again. SCHEMA_VERSION default reset to 1 to
  match.

229 passed, 0 failed. (HEAD had 15 failures, all linked to the
half-pregnant v2 state above.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:05:25 -05:00

297 lines
12 KiB
Python

from __future__ import annotations
import json
import logging
import secrets
import time
from pathlib import Path
from typing import Awaitable, Callable
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from .store import EpisodeStore, is_valid_id
from .version_gate import VersionGate
log = logging.getLogger("cis490.receiver")
SUFFIX = ".tar.zst"
SCHEMA_VERSION = 1
def _bearer_check(request: Request, expected: str | None) -> Response | None:
if expected is None:
return None
auth = request.headers.get("authorization", "")
prefix = "Bearer "
if not auth.startswith(prefix):
return JSONResponse({"error": "missing bearer token"}, status_code=401)
presented = auth[len(prefix):]
if not secrets.compare_digest(presented, expected):
return JSONResponse({"error": "bad bearer token"}, status_code=401)
return None
def make_app(
store: EpisodeStore,
max_episode_bytes: int,
bearer_token: str | None = None,
version_gate: VersionGate | None = None,
health_root: "Path | None" = None,
) -> Starlette:
async def health(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"})
async def ping(request: Request) -> JSONResponse:
"""Smoke-test endpoint. Verifies that the auth layer and the
WG/Caddy/receiver pipe are alive end-to-end without persisting
anything — index.jsonl is untouched. Used by ``cis490-shipper
--ping`` during initial bring-up of a new lab host."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
return JSONResponse(
{
"ok": True,
"host_id": request.headers.get("x-lab-host"),
"t_wall_ns": time.time_ns(),
"schema_version": SCHEMA_VERSION,
}
)
async def put_episode(request: Request) -> JSONResponse:
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
host_id: str = request.path_params["host_id"]
filename: str = request.path_params["filename"]
if not filename.endswith(SUFFIX):
return JSONResponse(
{"error": f"expected {SUFFIX} suffix"}, status_code=400
)
episode_id = filename[: -len(SUFFIX)]
if not is_valid_id(host_id) or not is_valid_id(episode_id):
return JSONResponse({"error": "bad host_id or episode_id"}, status_code=400)
expected_sha = request.headers.get("x-content-sha256")
if not expected_sha or len(expected_sha) != 64:
return JSONResponse(
{"error": "X-Content-SHA256 header (64 hex chars) required"},
status_code=400,
)
expected_sha = expected_sha.lower()
try:
schema_version = int(request.headers.get("x-schema-version", "1"))
except ValueError:
return JSONResponse({"error": "bad X-Schema-Version"}, status_code=400)
# Code-version gate. Every PUT must carry the orchestrator's
# commit hash and that hash must be in the receiver's current
# allow-list (last N commits on the maintainer's working clone).
# Missing → 400 (client bug); not-in-window → 412 (out-of-date
# lab host, must pull main).
commit = request.headers.get("x-cis490-code-commit", "").strip().lower()
if version_gate is not None:
ok, reason = version_gate.check(commit) if commit else (False, "missing")
if not ok:
head = version_gate.head()
if reason == "missing":
body = {
"error": "missing X-Cis490-Code-Commit header",
"remediation": (
"Lab-host is shipping with no code_version stamp. "
"On the lab host:\n"
" cd /opt/cis490 && sudo -u cis490 git pull origin main && "
"sudo /opt/cis490/scripts/install-lab-host.sh\n"
"If that errors out, read FIXYOURSELF.md at the repo "
"root — it's a six-branch decision tree for stuck "
"states the auto-update timer can't recover from."
),
"see_also": "FIXYOURSELF.md",
}
return JSONResponse(body, status_code=400)
if reason == "bad-format":
return JSONResponse(
{"error": "X-Cis490-Code-Commit must be 40 lowercase hex"},
status_code=400,
)
# not-in-window: out-of-date lab host OR diverged-HEAD
body = {
"error": "code commit rejected: not in receiver's allow-list",
"your_commit": commit,
"valid_window_size": version_gate.valid_count(),
"head_commit": head,
"remediation": (
"Your commit isn't on origin/main. Two cases:\n"
"\n"
"(1) You're just behind. Run on the lab host:\n"
" cd /opt/cis490 && sudo -u cis490 git pull --ff-only "
"origin main && sudo /opt/cis490/scripts/install-lab-host.sh\n"
"\n"
"(2) You have a LOCAL commit that's not on origin/main "
"(git pull --ff-only fails). This is the diverged-HEAD "
"case — the auto-update timer will refuse to fix it. "
"Read FIXYOURSELF.md §B at the repo root: three options "
"(push your commit, reset --hard origin/main, or file an "
"issue and wait). Pick one.\n"
"\n"
"Do NOT bypass this check by faking code_version in "
"meta.json — the gate exists to keep buggy data out of "
"the training set."
),
}
log.warning(
"rejected episode host=%s id=%s commit=%s reason=%s",
host_id, episode_id, commit[:12], reason,
)
return JSONResponse(body, status_code=412)
cl = request.headers.get("content-length")
if cl is not None:
try:
if int(cl) > max_episode_bytes:
return JSONResponse(
{"error": "episode exceeds max size"}, status_code=413
)
except ValueError:
return JSONResponse({"error": "bad Content-Length"}, status_code=400)
result = await store.ingest_stream(
host_id=host_id,
episode_id=episode_id,
expected_sha256=expected_sha,
schema_version=schema_version,
commit=commit or None,
body=request.stream(),
max_bytes=max_episode_bytes,
)
if result.status == "stored":
log.info(
"stored episode host=%s id=%s sha=%s size=%d",
host_id, episode_id, result.sha256, result.size_bytes,
)
return JSONResponse(
{"status": "stored", "sha256": result.sha256, "size_bytes": result.size_bytes},
status_code=201,
)
if result.status == "already-present":
return JSONResponse(
{"status": "already-present", "sha256": result.sha256},
status_code=200,
)
if result.status == "conflict":
log.warning(
"conflict host=%s id=%s existing=%s",
host_id, episode_id, result.existing_sha256,
)
return JSONResponse(
{"status": "conflict", "existing_sha256": result.existing_sha256},
status_code=409,
)
if result.status == "sha-mismatch":
return JSONResponse(
{"status": "sha-mismatch", "actual_sha256": result.sha256},
status_code=400,
)
if result.status == "too-large":
return JSONResponse({"error": "episode exceeds max size"}, status_code=413)
return JSONResponse({"error": "unknown ingest result"}, status_code=500)
async def put_host_health(request: Request) -> JSONResponse:
"""Lab hosts PUT their cis490-doctor.py JSON output here once
a day. We persist the latest snapshot per host at
<health_root>/<host_id>.json. Not gated by version_gate —
this is metadata about the host itself, not training data,
and we want sick hosts to be ABLE to report sickness even if
their code is out-of-date.
No size cap on the body beyond the global Starlette/uvicorn
max — doctor JSON is small (<10 KiB)."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
if health_root is None:
return JSONResponse(
{"error": "health endpoint disabled (no health_root configured)"},
status_code=404,
)
host_id = request.path_params["host_id"]
if not is_valid_id(host_id):
return JSONResponse({"error": "bad host_id"}, status_code=400)
body_bytes = await request.body()
try:
body = json.loads(body_bytes)
except (ValueError, json.JSONDecodeError):
return JSONResponse({"error": "body must be JSON"}, status_code=400)
if not isinstance(body, dict):
return JSONResponse({"error": "body must be a JSON object"},
status_code=400)
# Stamp the receiver's view of when this arrived. Lab-host
# clocks are coarse so we don't trust them for "freshness."
record = {
"host_id": host_id,
"received_at_wall": time.strftime("%Y-%m-%dT%H:%M:%SZ",
time.gmtime()),
"doctor": body,
}
health_root.mkdir(parents=True, exist_ok=True)
target = health_root / f"{host_id}.json"
# Write to a sibling tmp + atomic rename so a concurrent
# reader never sees a half-written file.
tmp = health_root / f".{host_id}.json.tmp"
tmp.write_text(json.dumps(record, indent=2))
tmp.replace(target)
return JSONResponse({"status": "stored", "host_id": host_id},
status_code=200)
async def get_fleet_health(request: Request) -> JSONResponse:
"""Aggregate view across all hosts that have reported. Used by
the maintainer / fleet-health monitor to spot sick hosts
without grepping per-host files."""
guard = _bearer_check(request, bearer_token)
if guard is not None:
return guard
if health_root is None or not health_root.exists():
return JSONResponse({"hosts": []})
out: list[dict] = []
for f in sorted(health_root.glob("*.json")):
if f.name.startswith("."): # tmp files
continue
try:
out.append(json.loads(f.read_text()))
except (OSError, json.JSONDecodeError):
continue
return JSONResponse({"hosts": out})
routes = [
Route("/v1/health", health, methods=["GET"]),
Route("/v1/ping", ping, methods=["POST"]),
Route(
"/v1/episodes/{host_id}/{filename}",
put_episode,
methods=["PUT"],
),
Route(
"/v1/host-health/{host_id}",
put_host_health,
methods=["PUT"],
),
Route(
"/v1/host-health",
get_fleet_health,
methods=["GET"],
),
]
return Starlette(routes=routes)