Tier-4 sample source: theZoo (no auth, no operator action)

Replaces MalwareBazaar with theZoo (https://github.com/ytisf/theZoo).
theZoo is a public security-research repo with hundreds of malware
samples organized by family, password-protected with the well-known
'infected'. No API key, no signup, nothing for an operator to do —
which is what zero-touch tier-4 actually means.

Changes:

- tools/auto_fetch_samples.py: rewrite. Clones theZoo (shallow, ~500 MB)
  to /var/lib/cis490/theZoo on first run, then for each manifest
  family without a sha256 it locates a matching Binaries/<Name>
  dir, extracts the .zip with password 'infected', picks the largest
  non-text payload as the binary, sha256s it, stages at
  samples/store/<sha256>, and rewrites manifest.toml in place
  (atomic tempfile + os.replace, stat preserved). Mandatory exit
  semantic: non-zero if no real samples landed.

- scripts/install-tier-3-4.sh: dropped the MB-key resolution chain
  (env var → local file → bootstrap.wg fetch). Now just runs
  auto_fetch_samples.py and dies if zero samples land. SKIP_TIER4
  remains as the explicit override but is documented as defeating
  the project.

- bootstrap/app.py + __main__.py + etc/cis490-bootstrap.service:
  removed the /v1/secret/<name> endpoint and the --secrets-root flag.
  Dead code now that no API key needs distributing. Live-rolled
  back on the Pi (404 verified post-restart, stale /etc/cis490/secrets
  dir removed).

- scripts/set-malwarebazaar-key.sh: deleted. No MB key means no
  one-time operator step.

- tests/test_bootstrap_secrets.py: deleted (route removed).

- AGENTS.md: rewrote tier-4 section to reflect zero-operator model.

148/148 tests pass. Bootstrap service rolled back live.
This commit is contained in:
max 2026-05-01 01:17:50 -05:00
parent 5d0e8e33a9
commit 265f3ad313
8 changed files with 208 additions and 371 deletions

View file

@ -110,44 +110,37 @@ disk, the next wave produces Tier-3 episodes (`meta.exploit.module_name`
populated). No orchestrator restart is required, but a restart speeds populated). No orchestrator restart is required, but a restart speeds
up the switch. up the switch.
### Tier-4 (real malware execution) is mandatory, push-button after one-time Pi setup ### Tier-4 (real malware execution) is mandatory, fully automated
**Real-binary episodes are the project's training target — Tier-4 is **Real-binary episodes are the project's training target — Tier-4 is
NOT optional.** A lab-host deploy that lands without real samples NOT optional.** A lab-host deploy that lands without real samples
fails loudly; mimic-only data does not answer the research question. fails loudly; mimic-only data does not answer the research question.
**One-time, on the Pi (operator runs once, ever):** There is **no operator step**. No API key, no signup, no manual
provisioning. `install-tier-3-4.sh` runs `tools/auto_fetch_samples.py`
which:
```sh 1. Clones (or pulls) `theZoo` from
sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh `https://github.com/ytisf/theZoo` to `/var/lib/cis490/theZoo`
``` (~500 MB shallow clone, public, GPL-3.0, security-research repo)
2. For each `[[sample]]` in `manifest.toml` without a sha256, locates
a directory in `theZoo/malware/Binaries/` whose name matches
the entry's `family` (case-insensitive substring + prefix priority)
3. Extracts the password-protected `.zip` (well-known password
`infected`)
4. Picks the largest non-text payload as the binary, computes its
sha256, copies to `/opt/cis490/samples/store/<sha256>`
5. Rewrites `manifest.toml` in place, atomically (tempfile +
`os.replace` preserving stat), adding `source = "theZoo"`,
`sha256 = "<hex>"`, and the upstream URL
Free signup at https://bazaar.abuse.ch/. The key lands at If `auto_fetch_samples.py` lands zero binaries (theZoo layout drift,
`/etc/cis490/secrets/malwarebazaar.token` (mode 0640, root:cis490). git clone failure, or a family has no matching directory),
The bootstrap service's `/v1/secret/malwarebazaar` endpoint then `install-tier-3-4.sh` exits non-zero. **No silent mimic-only fallback.**
serves it to every lab host — same trust boundary as the cert
endpoint (WG mesh, iptmonads-gated).
**Per lab host (auto):** `install-tier-3-4.sh` resolves the MB key The orchestrator's next selection that picks a sample with
in priority order: `kind == "real"` runs the real binary via the chunked-upload path
(`exploits.driver._resolve_workload`).
1. `MALWAREBAZAAR_API_KEY` env var
2. `/opt/cis490/samples/.bazaar.token` (cached from a previous run)
3. `https://bootstrap.wg/v1/secret/malwarebazaar` (auto-distributed
from the Pi)
If all three fail, the deploy aborts with the exact remediation
command. Once the key resolves, `tools/auto_fetch_samples.py` walks
each manifest family, queries MB by signature, fetches the first
match, sha256-verifies on the way in, lands the binary at
`/opt/cis490/samples/store/<sha256>`, and rewrites `manifest.toml`
in place. The orchestrator's next selection that picks a sample
with `kind == "real"` runs the real binary via the chunked-upload
path.
If `auto_fetch_samples.py` lands zero binaries (zero successful MB
queries), `install-tier-3-4.sh` exits non-zero. **No silent
mimic-only fallback** — the project's data depends on real samples.
Set `MALWAREBAZAAR_API_KEY` (free signup at https://bazaar.abuse.ch/) Set `MALWAREBAZAAR_API_KEY` (free signup at https://bazaar.abuse.ch/)
before running `install-tier-3-4.sh` and step 5 runs before running `install-tier-3-4.sh` and step 5 runs

View file

@ -33,14 +33,6 @@ def main(argv: list[str] | None = None) -> int:
default=Path("/home/max/.env/wg-pki/issued"), default=Path("/home/max/.env/wg-pki/issued"),
help="Where minted tarballs are cached.", help="Where minted tarballs are cached.",
) )
p.add_argument(
"--secrets-root",
type=Path,
default=Path("/etc/cis490/secrets"),
help="Directory holding shared secrets distributed to lab hosts. "
"Currently used for malwarebazaar.token; provisioned by "
"scripts/set-malwarebazaar-key.sh.",
)
p.add_argument("--log-level", default="info") p.add_argument("--log-level", default="info")
args = p.parse_args(argv) args = p.parse_args(argv)
@ -57,7 +49,6 @@ def main(argv: list[str] | None = None) -> int:
app = make_app( app = make_app(
issuer_script=args.issuer_script, issuer_script=args.issuer_script,
issued_root=args.issued_root, issued_root=args.issued_root,
secrets_root=args.secrets_root,
) )
log.info("listening on %s:%d", args.listen_host, args.listen_port) log.info("listening on %s:%d", args.listen_host, args.listen_port)
uvicorn.run( uvicorn.run(

View file

@ -61,7 +61,6 @@ def make_app(
*, *,
issuer_script: Path, issuer_script: Path,
issued_root: Path, issued_root: Path,
secrets_root: Path = Path("/etc/cis490/secrets"),
rate_limit_window_s: float = 5.0, rate_limit_window_s: float = 5.0,
) -> Starlette: ) -> Starlette:
"""Build the Starlette app. Wired by the production launcher in """Build the Starlette app. Wired by the production launcher in
@ -140,45 +139,8 @@ def make_app(
}, },
) )
async def get_secret(request: Request) -> Response:
"""Serve a named secret from `secrets_root`. Currently only
`malwarebazaar` is allowed the MB API key Tier-4 needs to
fetch real malware samples. Same trust boundary as the cert
endpoint: anything reaching bootstrap.wg has cleared
iptmonads' WG-membership check."""
name: str = request.path_params["name"]
# Strict allow-list to keep this from turning into a generic
# secrets API.
if name != "malwarebazaar":
return JSONResponse({"error": "unknown secret"}, status_code=404)
path = secrets_root / "malwarebazaar.token"
if not path.exists():
return JSONResponse(
{"error": "secret not provisioned",
"hint": "run scripts/set-malwarebazaar-key.sh on the receiver"},
status_code=404,
)
try:
data = path.read_text().strip()
except OSError as e:
return JSONResponse({"error": f"read failed: {e}"}, status_code=500)
if not data:
return JSONResponse({"error": "empty secret"}, status_code=500)
src = (
request.headers.get("x-real-ip")
or (request.headers.get("x-forwarded-for") or "").split(",")[0].strip()
or (request.client.host if request.client else "?")
)
log.info("served secret=%s to src=%s", name, src)
return Response(
content=data,
media_type="text/plain",
headers={"Cache-Control": "no-store"},
)
routes = [ routes = [
Route("/v1/health", health, methods=["GET"]), Route("/v1/health", health, methods=["GET"]),
Route("/v1/cert/{host_id}", get_cert, methods=["GET"]), Route("/v1/cert/{host_id}", get_cert, methods=["GET"]),
Route("/v1/secret/{name}", get_secret, methods=["GET"]),
] ]
return Starlette(routes=routes) return Starlette(routes=routes)

View file

@ -16,8 +16,7 @@ ExecStart=/opt/cis490/.venv/bin/python -m bootstrap \
--listen-host 127.0.0.1 \ --listen-host 127.0.0.1 \
--listen-port 8446 \ --listen-port 8446 \
--issuer-script /opt/wg-pki/scripts/issue-cis490-client-cert-wrapper.sh \ --issuer-script /opt/wg-pki/scripts/issue-cis490-client-cert-wrapper.sh \
--issued-root /var/lib/wg-pki/issued \ --issued-root /var/lib/wg-pki/issued
--secrets-root /etc/cis490/secrets
Restart=on-failure Restart=on-failure
RestartSec=5 RestartSec=5

View file

@ -1,8 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tier-3 + Tier-4 deploy orchestrator. Idempotent. Zero operator # Tier-3 + Tier-4 deploy orchestrator. Idempotent. ZERO operator
# interaction on the lab host (operator provisions the # interaction — including no API key, no signup, no manual upload.
# MalwareBazaar API key ONCE on the Pi via
# scripts/set-malwarebazaar-key.sh; from there it's auto-distributed).
# #
# Steps (each idempotent on its own): # Steps (each idempotent on its own):
# 1. install-msfrpcd.sh — auto-install metasploit-framework via # 1. install-msfrpcd.sh — auto-install metasploit-framework via
@ -14,22 +12,18 @@
# 4. Tier-3 verify — fire vsftpd_234_backdoor against the # 4. Tier-3 verify — fire vsftpd_234_backdoor against the
# freshly-fetched VM, confirm session # freshly-fetched VM, confirm session
# lands and an episode is recorded # lands and an episode is recorded
# 5. Tier-4 deploy — fetch MalwareBazaar API key (env > # 5. Tier-4 deploy — clone theZoo (public security-research
# local file > bootstrap.wg), then run # repo, no auth), extract one real
# auto_fetch_samples.py to pull one real # binary per manifest family, stage at
# binary per sample family. THIS IS NOT # samples/store/<sha256>, rewrite
# OPTIONAL — real-binary episodes are # manifest.toml in place. MANDATORY:
# the actual training target. Deploy # the deploy fails if zero samples land.
# fails if zero samples land.
# #
# Inputs (env, all optional): # Inputs (env, all optional):
# SKIP_VERIFY — set to skip the live Tier-3 fire test # SKIP_VERIFY — set to skip the live Tier-3 fire test
# SKIP_BRIDGE — set to skip bridge setup (limits to non-callback modules) # SKIP_BRIDGE — set to skip bridge setup (limits to non-callback modules)
# SKIP_TIER4 — set to skip Tier-4 deploy entirely (DEPRECATED; # SKIP_TIER4 — set to skip Tier-4 deploy entirely (DEPRECATED;
# leaves you with mimic-only data, defeats the project) # leaves you with mimic-only data, defeats the project)
# MALWAREBAZAAR_API_KEY — preferred input path; otherwise pulled
# from /opt/cis490/samples/.bazaar.token, then
# from https://bootstrap.wg/v1/secret/malwarebazaar
# #
# Run as root from anywhere on the lab host. Sub-scripts handle their # Run as root from anywhere on the lab host. Sub-scripts handle their
# own root checks. # own root checks.
@ -123,55 +117,27 @@ else
log "[4/5] SKIP_VERIFY set" log "[4/5] SKIP_VERIFY set"
fi fi
# --- 5. Tier-4 deploy (MANDATORY) -------------------------------------- # --- 5. Tier-4 deploy (MANDATORY, no auth required) --------------------
if [[ -n "${SKIP_TIER4:-}" ]]; then if [[ -n "${SKIP_TIER4:-}" ]]; then
log "[5/5] SKIP_TIER4 set — leaving this host on Tier 2/3 mimic-only." log "[5/5] SKIP_TIER4 set — leaving this host on Tier 2/3 mimic-only."
log " This is NOT the recommended configuration; the project's" log " This is NOT the recommended configuration; the project's"
log " training target is real-binary episodes." log " training target is real-binary episodes."
else else
log "[5/5] Tier-4 deploy (real malware fetch — mandatory)" log "[5/5] Tier-4 deploy (real malware fetch from theZoo — mandatory)"
# Resolve the MalwareBazaar API key, in priority order: command -v git >/dev/null || die "git not installed; need it to clone theZoo"
# 1. MALWAREBAZAAR_API_KEY env (preferred for one-shot ops)
# 2. /opt/cis490/samples/.bazaar.token (already on disk)
# 3. https://bootstrap.wg/v1/secret/malwarebazaar (auto-distributed
# from the Pi after the operator runs set-malwarebazaar-key.sh)
MB_KEY="${MALWAREBAZAAR_API_KEY:-}"
TOKEN_FILE="$INSTALL_ROOT/samples/.bazaar.token"
if [[ -z "$MB_KEY" && -f "$TOKEN_FILE" ]]; then
MB_KEY="$(cat "$TOKEN_FILE" | tr -d '[:space:]')"
log "using MB key from $TOKEN_FILE"
fi
if [[ -z "$MB_KEY" ]]; then
log "no local MB key — fetching from https://bootstrap.wg/v1/secret/malwarebazaar"
# Use the same Caddy root the cert auto-fetch trusts.
CADDY_ROOT="$INSTALL_ROOT/etc/caddy-root.crt"
[[ -f "$CADDY_ROOT" ]] || CADDY_ROOT="$REPO_ROOT/etc/caddy-root.crt"
if MB_KEY="$(curl -fsS \
--cacert "$CADDY_ROOT" \
--connect-timeout 10 --max-time 30 \
https://bootstrap.wg/v1/secret/malwarebazaar 2>/dev/null)"; then
MB_KEY="$(echo -n "$MB_KEY" | tr -d '[:space:]')"
install -d -o cis490 -g cis490 -m 0750 "$INSTALL_ROOT/samples"
install -m 0600 -o cis490 -g cis490 /dev/stdin "$TOKEN_FILE" <<<"$MB_KEY"
log "fetched MB key from bootstrap.wg + cached at $TOKEN_FILE"
else
die "could not fetch MB key from bootstrap.wg. Either:
- run on the Pi: sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh
(one-time per fleet; lab hosts auto-fetch after that), OR
- run on this host: MALWAREBAZAAR_API_KEY=<key> sudo $0
Get a free key at https://bazaar.abuse.ch/"
fi
fi
[[ -n "$MB_KEY" ]] || die "MB key still empty after all resolution paths"
log "running auto_fetch_samples.py — fetches one real binary per family"
PY="$INSTALL_ROOT/.venv/bin/python" PY="$INSTALL_ROOT/.venv/bin/python"
[[ -x "$PY" ]] || PY="$(command -v python3)" [[ -x "$PY" ]] || PY="$(command -v python3)"
if ! sudo -E MALWAREBAZAAR_API_KEY="$MB_KEY" -u cis490 "$PY" \
# theZoo clone lives on shared persistent storage so re-runs don't
# re-download. cis490 user owns it for periodic git pull.
THEZOO_DIR="${THEZOO_DIR:-/var/lib/cis490/theZoo}"
install -d -o cis490 -g cis490 -m 0755 "$(dirname "$THEZOO_DIR")"
if ! sudo -E -u cis490 "$PY" \
"$INSTALL_ROOT/tools/auto_fetch_samples.py" \ "$INSTALL_ROOT/tools/auto_fetch_samples.py" \
--thezoo-clone-dir "$THEZOO_DIR" \
> /tmp/cis490-tier4-deploy.log 2>&1; then > /tmp/cis490-tier4-deploy.log 2>&1; then
log "Tier-4 fetch failed — last 30 lines of /tmp/cis490-tier4-deploy.log:" log "Tier-4 fetch failed — last 30 lines of /tmp/cis490-tier4-deploy.log:"
tail -30 /tmp/cis490-tier4-deploy.log >&2 || true tail -30 /tmp/cis490-tier4-deploy.log >&2 || true

View file

@ -1,56 +0,0 @@
#!/usr/bin/env bash
# One-time operator step on the receiver Pi.
#
# Provisions the MalwareBazaar API key at /etc/cis490/secrets/malwarebazaar.token
# with mode 0640, owned by root:cis490 (the bootstrap service runs as root and
# reads this file directly; the cis490 user is included in the group so future
# rotations can be done without root).
#
# Once provisioned, every lab host that runs install-tier-3-4.sh fetches the
# key from https://bootstrap.wg/v1/secret/malwarebazaar (over WG, gated by
# iptmonads at L4) — operator does NOT need to repeat this on each lab host.
#
# Usage:
# sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh
# or:
# echo $key | sudo /opt/cis490/scripts/set-malwarebazaar-key.sh
set -euo pipefail
SECRETS_DIR="${SECRETS_DIR:-/etc/cis490/secrets}"
KEY_FILE="$SECRETS_DIR/malwarebazaar.token"
log() { printf '[set-malwarebazaar-key] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
[[ $EUID -eq 0 ]] || die "must run as root"
# Accept the key via env var first, stdin second.
KEY="${MALWAREBAZAAR_API_KEY:-}"
if [[ -z "$KEY" ]] && [[ ! -t 0 ]]; then
KEY="$(cat)"
fi
KEY="$(echo -n "$KEY" | tr -d '[:space:]')"
[[ -n "$KEY" ]] || die "no key provided. Set MALWAREBAZAAR_API_KEY or pipe via stdin."
# Free signup at https://bazaar.abuse.ch/ — the key is a 64-char
# alphanumeric string. Loose sanity check.
[[ ${#KEY} -ge 32 ]] || die "key looks too short (${#KEY} chars). Get a real one from https://bazaar.abuse.ch/"
if ! id -u cis490 >/dev/null 2>&1; then
die "cis490 user not present — run install-receiver.sh first"
fi
install -d -o root -g cis490 -m 0750 "$SECRETS_DIR"
install -m 0640 -o root -g cis490 /dev/stdin "$KEY_FILE" <<<"$KEY"
log "key installed at $KEY_FILE (${#KEY} chars)"
log ""
log "Next step: each lab host's install-tier-3-4.sh will now fetch it"
log "automatically from https://bootstrap.wg/v1/secret/malwarebazaar"
log "during deploy. To force a re-fetch on an already-deployed host:"
log " ssh <lab-host> sudo rm /opt/cis490/samples/.bazaar.token"
log " ssh <lab-host> sudo /opt/cis490/scripts/install-tier-3-4.sh"
log ""
log "If the bootstrap service was running already, no restart needed —"
log "the secret endpoint reads the file fresh on each request."

View file

@ -1,80 +0,0 @@
"""Tests for the bootstrap.wg /v1/secret/<name> endpoint.
Tier 4 needs the MalwareBazaar API key on each lab host. We
distribute the key from the Pi via this endpoint instead of forcing
the operator to copy it manually to every host. Trust boundary is
identical to /v1/cert/<host>: a caller that reaches bootstrap.wg
is already a WG-mesh peer (iptmonads gate).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from starlette.testclient import TestClient
from bootstrap.app import make_app
@pytest.fixture
def bootstrap_app(tmp_path: Path):
issued_root = tmp_path / "issued"
issued_root.mkdir()
secrets_root = tmp_path / "secrets"
secrets_root.mkdir()
# Issuer script doesn't matter for these tests — make a no-op stub
# so make_app doesn't barf on a missing path.
stub = tmp_path / "stub.sh"
stub.write_text("#!/bin/sh\nexit 0\n")
stub.chmod(0o755)
app = make_app(
issuer_script=stub,
issued_root=issued_root,
secrets_root=secrets_root,
)
return app, secrets_root
def test_secret_404_when_not_provisioned(bootstrap_app):
app, _ = bootstrap_app
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 404
assert "secret not provisioned" in r.json()["error"]
def test_secret_returns_provisioned_token(bootstrap_app):
app, secrets_root = bootstrap_app
token = "a" * 64
(secrets_root / "malwarebazaar.token").write_text(token + "\n")
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 200
# Response is the bare token, no JSON wrapping (lab-host curls
# this and pipes straight into the install flow).
assert r.text.strip() == token
# Don't cache the secret in any intermediate proxy.
assert r.headers.get("cache-control") == "no-store"
def test_unknown_secret_name_404(bootstrap_app):
app, secrets_root = bootstrap_app
# Even if a file with that name existed on disk, the route's
# allow-list rejects anything but `malwarebazaar`.
(secrets_root / "anything-else.token").write_text("x")
with TestClient(app) as client:
r = client.get("/v1/secret/anything-else")
assert r.status_code == 404
assert "unknown secret" in r.json()["error"]
def test_empty_secret_500(bootstrap_app):
"""An empty token file is operator error — fail loudly so the
lab-host install doesn't end up calling MB with no key."""
app, secrets_root = bootstrap_app
(secrets_root / "malwarebazaar.token").write_text("")
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 500
assert "empty" in r.json()["error"]

View file

@ -1,111 +1,151 @@
"""``cis490-auto-fetch-samples`` — pull one real binary per manifest """``cis490-auto-fetch-samples`` — pull one real binary per manifest
family from MalwareBazaar and update ``samples/manifest.toml``. family from theZoo and update ``samples/manifest.toml``.
The selection is automatic: for each entry in ``samples/manifest.toml`` No API key, no signup, no operator interaction. theZoo is a public
that doesn't already have a sha256, we query MalwareBazaar for a security-research repository (https://github.com/ytisf/theZoo)
recent sample whose ``signature`` field matches the entry's ``family`` maintained for malware analysis. Each sample is a password-protected
(e.g. ``family = "XMRig"`` MB signature ``XMRig``). The first zip; the password is the well-known ``infected``. We clone the repo
result is downloaded via ``tools.fetch_sample.fetch_sample``, the once (~500 MB shallow), then for each manifest entry without a
sha256 lands in ``samples/store/<sha256>``, and the manifest entry sha256 we:
gains ``source``, ``sha256``, and ``url`` fields.
Idempotent: entries that already have a sha256 are skipped. Manifest 1. Locate a directory in ``theZoo/malware/Binaries/`` matching
edits are atomic (tempfile + os.replace) and preserve the file's the entry's ``family`` (case-insensitive substring)
ownership and mode. 2. Find the .zip in that directory
3. Extract with password ``infected``
4. Pick the largest non-text payload as the binary
5. Compute its sha256, copy to ``samples/store/<sha256>``
6. Rewrite ``manifest.toml`` in place adding source/sha256/url
Run on the lab host as root (or as the cis490 service user, if it Idempotent: entries with sha256 already set are skipped. Manifest
has write permission to ``samples/``): edits are atomic (tempfile + os.replace, stat preserved). Families
that don't match anything in theZoo fail loudly so the deploy
MALWAREBAZAAR_API_KEY=<key> \\ script can decide whether to abort.
sudo -E -u cis490 /opt/cis490/.venv/bin/python \\
/opt/cis490/tools/auto_fetch_samples.py
Without an API key, exits 0 with no work done keeps the install
script's call site uncomplicated.
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import hashlib
import json import json
import logging import logging
import os import os
import shutil
import subprocess
import sys import sys
import urllib.parse import zipfile
import urllib.request
from pathlib import Path from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT)) sys.path.insert(0, str(REPO_ROOT))
sys.path.insert(0, str(REPO_ROOT / "tools"))
from samples.manifest import SampleManifest # noqa: E402 from samples.manifest import SampleManifest # noqa: E402
# fetch_sample is a sibling tool — load via its module path.
import importlib.util # noqa: E402
_spec = importlib.util.spec_from_file_location(
"fetch_sample", REPO_ROOT / "tools" / "fetch_sample.py"
)
_fetch_sample = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_fetch_sample)
log = logging.getLogger("cis490.auto_fetch_samples") log = logging.getLogger("cis490.auto_fetch_samples")
MB_ENDPOINT = "https://mb-api.abuse.ch/api/v1/" THEZOO_URL = "https://github.com/ytisf/theZoo.git"
THEZOO_PASSWORD = b"infected"
def query_mb_by_signature(signature: str, api_key: str, *, limit: int = 5, def _ensure_thezoo(clone_dir: Path) -> Path:
timeout_s: float = 30.0) -> list[dict]: """Clone theZoo if missing; pull if present. Returns the clone path."""
"""Return up to ``limit`` recent MB samples whose signature matches. if (clone_dir / ".git").exists():
log.info("theZoo already cloned at %s; pulling latest", clone_dir)
Uses the ``get_siginfo`` query, which returns the latest samples try:
for a given Yara/community signature. Falls back to an empty list subprocess.run(
on any error so the caller can move on to the next family.""" ["git", "-C", str(clone_dir), "pull", "--ff-only"],
body = urllib.parse.urlencode({ check=True, capture_output=True, text=True, timeout=120,
"query": "get_siginfo", )
"signature": signature, except subprocess.CalledProcessError as e:
"limit": str(limit), log.warning("git pull failed (using existing clone): %s",
}).encode() e.stderr[:200])
req = urllib.request.Request( return clone_dir
MB_ENDPOINT, data=body, log.info("cloning %s%s (~500 MB shallow)", THEZOO_URL, clone_dir)
headers={"Auth-Key": api_key}, clone_dir.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", THEZOO_URL, str(clone_dir)],
check=True, timeout=600,
) )
try: return clone_dir
with urllib.request.urlopen(req, timeout=timeout_s) as r:
payload = json.loads(r.read().decode("utf-8"))
except Exception as e: def _find_family_dir(thezoo: Path, family: str) -> Path | None:
log.warning("MB get_siginfo(%r) failed: %s", signature, e) """Locate a Binaries subdir whose name contains ``family``
return [] (case-insensitive). theZoo's layout is
if payload.get("query_status") != "ok": ``malware/Binaries/<Family-Specific-Name>/``."""
log.warning("MB returned %r for signature %r", binaries = thezoo / "malware" / "Binaries"
payload.get("query_status"), signature) if not binaries.is_dir():
return [] log.warning("theZoo layout missing %s — pull broke?", binaries)
rows = payload.get("data") or [] return None
return rows if isinstance(rows, list) else [] needle = family.lower()
matches: list[Path] = []
for child in sorted(binaries.iterdir()):
if not child.is_dir():
continue
if needle in child.name.lower():
matches.append(child)
if not matches:
return None
# Prefer exact-match prefix (e.g. "Mirai" before "MirageFox").
for m in matches:
if m.name.lower().startswith(needle):
return m
return matches[0]
def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None:
"""Extract the password-protected zip and return the path to the
largest payload that isn't an obvious text artifact (md5/sha256
sidecars, READMEs, license files)."""
work_dir.mkdir(parents=True, exist_ok=True)
candidates: list[tuple[int, Path]] = []
with zipfile.ZipFile(zip_path) as z:
try:
z.extractall(path=work_dir, pwd=THEZOO_PASSWORD)
except RuntimeError as e:
log.warning("extract %s failed: %s", zip_path.name, e)
return None
for f in work_dir.rglob("*"):
if not f.is_file():
continue
name = f.name.lower()
if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1",
".txt", ".md", ".pass")):
continue
if name in {"readme", "license", "metadata.txt"}:
continue
candidates.append((f.stat().st_size, f))
if not candidates:
return None
candidates.sort(reverse=True)
return candidates[0][1]
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def update_manifest_entry(manifest_path: Path, name: str, def update_manifest_entry(manifest_path: Path, name: str,
source: str, sha256: str, url: str) -> None: source: str, sha256: str, url: str) -> None:
"""In-place add ``source`` / ``sha256`` / ``url`` to the entry """Add `source`, `sha256`, `url` lines to the manifest entry whose
whose ``name`` matches. Preserves ownership and mode across the `name` matches. Atomic + stat-preserving."""
tempfile-replace dance."""
text = manifest_path.read_text() text = manifest_path.read_text()
needle = f'name = "{name}"' needle = f'name = "{name}"'
idx = text.find(needle) idx = text.find(needle)
if idx < 0: if idx < 0:
raise ValueError(f"name = {name!r} not found in {manifest_path}") raise ValueError(f"name = {name!r} not found in {manifest_path}")
# Find the end of this [[sample]] block (next "[[" or EOF).
next_block = text.find("[[", idx + len(needle)) next_block = text.find("[[", idx + len(needle))
end = next_block if next_block != -1 else len(text) end = next_block if next_block != -1 else len(text)
block = text[idx:end] block = text[idx:end]
# Skip if already has sha256. if "sha256 =" in block:
if "sha256 =" in block and "TBD" not in block: log.info("entry %s already has sha256; skipping in-place edit", name)
log.info("entry %s already has sha256; skipping", name)
return return
# Insert the three new lines before the description (or at end).
insert = ( insert = (
f'source = "{source}"\n' f'source = "{source}"\n'
f'sha256 = "{sha256}"\n' f'sha256 = "{sha256}"\n'
@ -129,28 +169,66 @@ def update_manifest_entry(manifest_path: Path, name: str,
os.chmod(manifest_path, st.st_mode & 0o7777) os.chmod(manifest_path, st.st_mode & 0o7777)
def fetch_one(thezoo: Path, sample_family: str, sample_name: str,
store_root: Path, work_root: Path) -> tuple[str, Path] | None:
"""Locate, extract, and stage one binary for a manifest family.
Returns (sha256, store_path) or None if the family wasn't found."""
fam_dir = _find_family_dir(thezoo, sample_family)
if fam_dir is None:
log.warning("%s: no theZoo dir matching family=%r", sample_name, sample_family)
return None
zips = sorted(fam_dir.rglob("*.zip"))
if not zips:
log.warning("%s: %s has no .zip — theZoo layout drift?",
sample_name, fam_dir)
return None
work_dir = work_root / sample_name
if work_dir.exists():
shutil.rmtree(work_dir)
binary = _extract_largest_binary(zips[0], work_dir)
if binary is None:
log.warning("%s: %s extraction yielded no payload",
sample_name, zips[0])
return None
sha = _sha256_of(binary)
store_root.mkdir(parents=True, exist_ok=True)
target = store_root / sha
if not target.exists():
shutil.copy2(binary, target)
log.info("%s: staged %s (%d bytes, sha256=%s)",
sample_name, target.name, target.stat().st_size, sha[:12])
# Best-effort: clean the per-sample work dir so disk doesn't grow.
shutil.rmtree(work_dir, ignore_errors=True)
return sha, target
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples") p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples")
p.add_argument("--manifest", p.add_argument("--manifest",
default=str(REPO_ROOT / "samples" / "manifest.toml")) default=str(REPO_ROOT / "samples" / "manifest.toml"))
p.add_argument("--store-root", p.add_argument("--store-root",
default=str(REPO_ROOT / "samples" / "store")) default=str(REPO_ROOT / "samples" / "store"))
p.add_argument("--limit-per-family", type=int, default=1, p.add_argument("--thezoo-clone-dir",
help="how many real binaries to fetch per family") default="/var/lib/cis490/theZoo",
help="Where to (re)clone theZoo. Cached across runs.")
p.add_argument("--work-root",
default="/tmp/cis490-thezoo-extract",
help="Per-run extraction scratch dir.")
p.add_argument("--dry-run", action="store_true") p.add_argument("--dry-run", action="store_true")
args = p.parse_args(argv) args = p.parse_args(argv)
logging.basicConfig(level=logging.INFO, logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s") format="%(asctime)s %(levelname)s %(message)s")
api_key = _fetch_sample._read_api_key(REPO_ROOT) if shutil.which("git") is None:
if not api_key: log.error("git not on PATH; install git first")
log.warning("MALWAREBAZAAR_API_KEY not set — nothing to do") return 2
return 0
manifest_path = Path(args.manifest) manifest_path = Path(args.manifest)
store_root = Path(args.store_root) store_root = Path(args.store_root)
work_root = Path(args.work_root)
manifest = SampleManifest.load(manifest_path) manifest = SampleManifest.load(manifest_path)
thezoo = _ensure_thezoo(Path(args.thezoo_clone_dir))
fetched = 0 fetched = 0
skipped = 0 skipped = 0
@ -161,42 +239,26 @@ def main(argv: list[str] | None = None) -> int:
sample.name, sample.sha256[:12]) sample.name, sample.sha256[:12])
skipped += 1 skipped += 1
continue continue
log.info("%s: querying MB for family=%r", sample.name, sample.family)
rows = query_mb_by_signature(sample.family, api_key,
limit=args.limit_per_family)
if not rows:
log.warning("%s: no MB matches for family=%r — leaving as mimic",
sample.name, sample.family)
failed += 1
continue
# Pick the first non-corrupt-looking row that has a sha256.
chosen = next((r for r in rows if r.get("sha256_hash")), None)
if not chosen:
log.warning("%s: MB rows had no sha256_hash — skipping", sample.name)
failed += 1
continue
sha = chosen["sha256_hash"].lower()
url = f"https://bazaar.abuse.ch/sample/{sha}/"
if args.dry_run: if args.dry_run:
log.info("%s [dry-run]: would fetch %s", sample.name, sha) fam = _find_family_dir(thezoo, sample.family)
log.info("%s [dry-run]: family=%s match=%s",
sample.name, sample.family, fam.name if fam else "<none>")
continue continue
try: result = fetch_one(thezoo, sample.family, sample.name,
_fetch_sample.fetch_sample(sha, store_root, api_key) store_root, work_root)
update_manifest_entry(manifest_path, sample.name, if result is None:
source="MalwareBazaar", sha256=sha, url=url)
log.info("%s: fetched + manifest updated (sha256=%s)",
sample.name, sha[:12])
fetched += 1
except Exception as e:
log.warning("%s: fetch failed: %s — leaving as mimic", sample.name, e)
failed += 1 failed += 1
continue
sha, _ = result
url = f"https://github.com/ytisf/theZoo/tree/master/malware/Binaries"
update_manifest_entry(manifest_path, sample.name,
source="theZoo", sha256=sha, url=url)
fetched += 1
log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed) log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed)
# Tier 4 is mandatory — exit non-zero unless at least one real # Tier 4 is mandatory — non-zero exit if no real samples staged.
# binary landed (or all entries were already real, i.e. nothing
# to do). The deploy script depends on this exit semantic.
if fetched == 0 and skipped == 0: if fetched == 0 and skipped == 0:
log.error("zero samples fetched and zero already-real — Tier 4 not viable") log.error("zero samples staged — check theZoo clone + family-name mapping")
return 1 return 1
return 0 return 0