CIS490/bootstrap/app.py
max 265f3ad313 Tier-4 sample source: theZoo (no auth, no operator action)
Replaces MalwareBazaar with theZoo (https://github.com/ytisf/theZoo).
theZoo is a public security-research repo with hundreds of malware
samples organized by family, password-protected with the well-known
'infected'. No API key, no signup, nothing for an operator to do —
which is what zero-touch tier-4 actually means.

Changes:

- tools/auto_fetch_samples.py: rewrite. Clones theZoo (shallow, ~500 MB)
  to /var/lib/cis490/theZoo on first run, then for each manifest
  family without a sha256 it locates a matching Binaries/<Name>
  dir, extracts the .zip with password 'infected', picks the largest
  non-text payload as the binary, sha256s it, stages at
  samples/store/<sha256>, and rewrites manifest.toml in place
  (atomic tempfile + os.replace, stat preserved). Mandatory exit
  semantic: non-zero if no real samples landed.

- scripts/install-tier-3-4.sh: dropped the MB-key resolution chain
  (env var → local file → bootstrap.wg fetch). Now just runs
  auto_fetch_samples.py and dies if zero samples land. SKIP_TIER4
  remains as the explicit override but is documented as defeating
  the project.

- bootstrap/app.py + __main__.py + etc/cis490-bootstrap.service:
  removed the /v1/secret/<name> endpoint and the --secrets-root flag.
  Dead code now that no API key needs distributing. Live-rolled
  back on the Pi (404 verified post-restart, stale /etc/cis490/secrets
  dir removed).

- scripts/set-malwarebazaar-key.sh: deleted. No MB key means no
  one-time operator step.

- tests/test_bootstrap_secrets.py: deleted (route removed).

- AGENTS.md: rewrote tier-4 section to reflect zero-operator model.

148/148 tests pass. Bootstrap service rolled back live.
2026-05-01 01:17:50 -05:00

146 lines
5.3 KiB
Python

"""``cis490-bootstrap`` — auto-issue mTLS leaf certs to enrolled lab hosts.
This is the chicken-and-egg fix for first-time lab-host setup. A
freshly wg-enrolled device has WG access (and trusts the wg-pki CA)
but has no client cert yet, so it can't authenticate to the
mTLS-protected ``collector.wg``. This service exposes a *plain-TLS*
(no client-auth) endpoint that the lab host can call once during
``install-lab-host.sh`` to retrieve its leaf cert tarball.
Trust boundary: anything that reaches ``bootstrap.wg`` has already
passed iptmonads' WG-membership check at L4. No further
authentication is required for the bootstrap pull — by the time a
caller can connect at all they're a peer the operator authorized.
The privilege boundary, on the other hand, is real: minting certs
requires the wg-pki CA private key (root-only at
``/var/lib/wg-pki/cis490-client-ca/ca.key``). This service therefore
runs as root in a tight sandbox (see ``etc/cis490-bootstrap.service``)
and shells out to ``issue-cis490-client-cert.sh`` for each mint.
Endpoints:
GET /v1/cert/{host_id} — return tarball of {ca.crt, leaf.pem, leaf.key}
for ``host_id``. Cached — successive calls
return the same bytes.
GET /v1/health — liveness probe (no auth needed).
Each mint is logged with the source IP (after Caddy's X-Real-IP
forward) so the operator has an audit trail of which devices have
fetched which certs.
"""
from __future__ import annotations
import logging
import re
import subprocess
import time
from pathlib import Path
from typing import Awaitable, Callable
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from starlette.routing import Route
log = logging.getLogger("cis490.bootstrap")
# Sane host_id charset — same rules the receiver enforces, mirrored
# here so mint requests can't smuggle path traversal in.
_HOST_ID_RE = re.compile(r"^[A-Za-z0-9_.-]{1,64}$")
def _is_valid_host_id(s: str) -> bool:
return bool(_HOST_ID_RE.match(s))
def make_app(
*,
issuer_script: Path,
issued_root: Path,
rate_limit_window_s: float = 5.0,
) -> Starlette:
"""Build the Starlette app. Wired by the production launcher in
``bootstrap/__main__.py``; tests can pass synthetic paths."""
issued_root.mkdir(parents=True, exist_ok=True)
# Coarse per-IP rate limiter to make a casual scan annoying. Not
# a real defense — the WG mesh is the actual perimeter.
last_request: dict[str, float] = {}
async def health(request: Request) -> Response:
return JSONResponse({"status": "ok"})
async def get_cert(request: Request) -> Response:
host_id: str = request.path_params["host_id"]
if not _is_valid_host_id(host_id):
return JSONResponse({"error": "bad host_id"}, status_code=400)
# Caddy forwards the original WG-side IP via X-Real-IP /
# X-Forwarded-For; fall back to the direct peer if running
# without Caddy in front (tests).
src = (
request.headers.get("x-real-ip")
or (request.headers.get("x-forwarded-for") or "").split(",")[0].strip()
or (request.client.host if request.client else "?")
)
now = time.monotonic()
prev = last_request.get(src, 0.0)
if (now - prev) < rate_limit_window_s:
return JSONResponse(
{"error": "rate limited; back off"},
status_code=429,
)
last_request[src] = now
tar_path = issued_root / host_id / f"{host_id}.tar"
if not tar_path.exists():
log.info("minting cert for host_id=%s src=%s", host_id, src)
try:
subprocess.run(
[
str(issuer_script), host_id,
"--out-dir", str(issued_root / host_id),
],
check=True,
capture_output=True,
text=True,
timeout=30,
)
except subprocess.CalledProcessError as e:
log.error("issue script failed for %s: rc=%d stderr=%s",
host_id, e.returncode, e.stderr[:500])
return JSONResponse(
{"error": "mint failed", "detail": e.stderr[:500]},
status_code=500,
)
except (OSError, subprocess.TimeoutExpired) as e:
log.exception("issue script transport error for %s", host_id)
return JSONResponse(
{"error": f"transport: {e}"},
status_code=500,
)
else:
log.info("cache hit for host_id=%s src=%s", host_id, src)
if not tar_path.exists():
return JSONResponse({"error": "tarball not produced"}, status_code=500)
return FileResponse(
tar_path,
media_type="application/x-tar",
filename=f"{host_id}.tar",
headers={
"X-Cis490-Host-Id": host_id,
"X-Cis490-Cert-Source-IP": src,
},
)
routes = [
Route("/v1/health", health, methods=["GET"]),
Route("/v1/cert/{host_id}", get_cert, methods=["GET"]),
]
return Starlette(routes=routes)