"""``cis490-bootstrap`` — auto-issue mTLS leaf certs to enrolled lab hosts. This is the chicken-and-egg fix for first-time lab-host setup. A freshly wg-enrolled device has WG access (and trusts the wg-pki CA) but has no client cert yet, so it can't authenticate to the mTLS-protected ``collector.wg``. This service exposes a *plain-TLS* (no client-auth) endpoint that the lab host can call once during ``install-lab-host.sh`` to retrieve its leaf cert tarball. Trust boundary: anything that reaches ``bootstrap.wg`` has already passed iptmonads' WG-membership check at L4. No further authentication is required for the bootstrap pull — by the time a caller can connect at all they're a peer the operator authorized. The privilege boundary, on the other hand, is real: minting certs requires the wg-pki CA private key (root-only at ``/var/lib/wg-pki/cis490-client-ca/ca.key``). This service therefore runs as root in a tight sandbox (see ``etc/cis490-bootstrap.service``) and shells out to ``issue-cis490-client-cert.sh`` for each mint. Endpoints: GET /v1/cert/{host_id} — return tarball of {ca.crt, leaf.pem, leaf.key} for ``host_id``. Cached — successive calls return the same bytes. GET /v1/health — liveness probe (no auth needed). Each mint is logged with the source IP (after Caddy's X-Real-IP forward) so the operator has an audit trail of which devices have fetched which certs. """ from __future__ import annotations import logging import re import subprocess import time from pathlib import Path from typing import Awaitable, Callable from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import FileResponse, JSONResponse, Response from starlette.routing import Route log = logging.getLogger("cis490.bootstrap") # Sane host_id charset — same rules the receiver enforces, mirrored # here so mint requests can't smuggle path traversal in. _HOST_ID_RE = re.compile(r"^[A-Za-z0-9_.-]{1,64}$") def _is_valid_host_id(s: str) -> bool: return bool(_HOST_ID_RE.match(s)) def make_app( *, issuer_script: Path, issued_root: Path, rate_limit_window_s: float = 5.0, ) -> Starlette: """Build the Starlette app. Wired by the production launcher in ``bootstrap/__main__.py``; tests can pass synthetic paths.""" issued_root.mkdir(parents=True, exist_ok=True) # Coarse per-IP rate limiter to make a casual scan annoying. Not # a real defense — the WG mesh is the actual perimeter. last_request: dict[str, float] = {} async def health(request: Request) -> Response: return JSONResponse({"status": "ok"}) async def get_cert(request: Request) -> Response: host_id: str = request.path_params["host_id"] if not _is_valid_host_id(host_id): return JSONResponse({"error": "bad host_id"}, status_code=400) # Caddy forwards the original WG-side IP via X-Real-IP / # X-Forwarded-For; fall back to the direct peer if running # without Caddy in front (tests). src = ( request.headers.get("x-real-ip") or (request.headers.get("x-forwarded-for") or "").split(",")[0].strip() or (request.client.host if request.client else "?") ) now = time.monotonic() prev = last_request.get(src, 0.0) if (now - prev) < rate_limit_window_s: return JSONResponse( {"error": "rate limited; back off"}, status_code=429, ) last_request[src] = now tar_path = issued_root / host_id / f"{host_id}.tar" if not tar_path.exists(): log.info("minting cert for host_id=%s src=%s", host_id, src) try: subprocess.run( [ str(issuer_script), host_id, "--out-dir", str(issued_root / host_id), ], check=True, capture_output=True, text=True, timeout=30, ) except subprocess.CalledProcessError as e: log.error("issue script failed for %s: rc=%d stderr=%s", host_id, e.returncode, e.stderr[:500]) return JSONResponse( {"error": "mint failed", "detail": e.stderr[:500]}, status_code=500, ) except (OSError, subprocess.TimeoutExpired) as e: log.exception("issue script transport error for %s", host_id) return JSONResponse( {"error": f"transport: {e}"}, status_code=500, ) else: log.info("cache hit for host_id=%s src=%s", host_id, src) if not tar_path.exists(): return JSONResponse({"error": "tarball not produced"}, status_code=500) return FileResponse( tar_path, media_type="application/x-tar", filename=f"{host_id}.tar", headers={ "X-Cis490-Host-Id": host_id, "X-Cis490-Cert-Source-IP": src, }, ) routes = [ Route("/v1/health", health, methods=["GET"]), Route("/v1/cert/{host_id}", get_cert, methods=["GET"]), ] return Starlette(routes=routes)