Tier-4 sample source: theZoo (no auth, no operator action)

Replaces MalwareBazaar with theZoo (https://github.com/ytisf/theZoo).
theZoo is a public security-research repo with hundreds of malware
samples organized by family, password-protected with the well-known
'infected'. No API key, no signup, nothing for an operator to do —
which is what zero-touch tier-4 actually means.

Changes:

- tools/auto_fetch_samples.py: rewrite. Clones theZoo (shallow, ~500 MB)
  to /var/lib/cis490/theZoo on first run, then for each manifest
  family without a sha256 it locates a matching Binaries/<Name>
  dir, extracts the .zip with password 'infected', picks the largest
  non-text payload as the binary, sha256s it, stages at
  samples/store/<sha256>, and rewrites manifest.toml in place
  (atomic tempfile + os.replace, stat preserved). Mandatory exit
  semantic: non-zero if no real samples landed.

- scripts/install-tier-3-4.sh: dropped the MB-key resolution chain
  (env var → local file → bootstrap.wg fetch). Now just runs
  auto_fetch_samples.py and dies if zero samples land. SKIP_TIER4
  remains as the explicit override but is documented as defeating
  the project.

- bootstrap/app.py + __main__.py + etc/cis490-bootstrap.service:
  removed the /v1/secret/<name> endpoint and the --secrets-root flag.
  Dead code now that no API key needs distributing. Live-rolled
  back on the Pi (404 verified post-restart, stale /etc/cis490/secrets
  dir removed).

- scripts/set-malwarebazaar-key.sh: deleted. No MB key means no
  one-time operator step.

- tests/test_bootstrap_secrets.py: deleted (route removed).

- AGENTS.md: rewrote tier-4 section to reflect zero-operator model.

148/148 tests pass. Bootstrap service rolled back live.
This commit is contained in:
max 2026-05-01 01:17:50 -05:00
parent 5d0e8e33a9
commit 265f3ad313
8 changed files with 208 additions and 371 deletions

View file

@ -110,44 +110,37 @@ disk, the next wave produces Tier-3 episodes (`meta.exploit.module_name`
populated). No orchestrator restart is required, but a restart speeds
up the switch.
### Tier-4 (real malware execution) is mandatory, push-button after one-time Pi setup
### Tier-4 (real malware execution) is mandatory, fully automated
**Real-binary episodes are the project's training target — Tier-4 is
NOT optional.** A lab-host deploy that lands without real samples
fails loudly; mimic-only data does not answer the research question.
**One-time, on the Pi (operator runs once, ever):**
There is **no operator step**. No API key, no signup, no manual
provisioning. `install-tier-3-4.sh` runs `tools/auto_fetch_samples.py`
which:
```sh
sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh
```
1. Clones (or pulls) `theZoo` from
`https://github.com/ytisf/theZoo` to `/var/lib/cis490/theZoo`
(~500 MB shallow clone, public, GPL-3.0, security-research repo)
2. For each `[[sample]]` in `manifest.toml` without a sha256, locates
a directory in `theZoo/malware/Binaries/` whose name matches
the entry's `family` (case-insensitive substring + prefix priority)
3. Extracts the password-protected `.zip` (well-known password
`infected`)
4. Picks the largest non-text payload as the binary, computes its
sha256, copies to `/opt/cis490/samples/store/<sha256>`
5. Rewrites `manifest.toml` in place, atomically (tempfile +
`os.replace` preserving stat), adding `source = "theZoo"`,
`sha256 = "<hex>"`, and the upstream URL
Free signup at https://bazaar.abuse.ch/. The key lands at
`/etc/cis490/secrets/malwarebazaar.token` (mode 0640, root:cis490).
The bootstrap service's `/v1/secret/malwarebazaar` endpoint then
serves it to every lab host — same trust boundary as the cert
endpoint (WG mesh, iptmonads-gated).
If `auto_fetch_samples.py` lands zero binaries (theZoo layout drift,
git clone failure, or a family has no matching directory),
`install-tier-3-4.sh` exits non-zero. **No silent mimic-only fallback.**
**Per lab host (auto):** `install-tier-3-4.sh` resolves the MB key
in priority order:
1. `MALWAREBAZAAR_API_KEY` env var
2. `/opt/cis490/samples/.bazaar.token` (cached from a previous run)
3. `https://bootstrap.wg/v1/secret/malwarebazaar` (auto-distributed
from the Pi)
If all three fail, the deploy aborts with the exact remediation
command. Once the key resolves, `tools/auto_fetch_samples.py` walks
each manifest family, queries MB by signature, fetches the first
match, sha256-verifies on the way in, lands the binary at
`/opt/cis490/samples/store/<sha256>`, and rewrites `manifest.toml`
in place. The orchestrator's next selection that picks a sample
with `kind == "real"` runs the real binary via the chunked-upload
path.
If `auto_fetch_samples.py` lands zero binaries (zero successful MB
queries), `install-tier-3-4.sh` exits non-zero. **No silent
mimic-only fallback** — the project's data depends on real samples.
The orchestrator's next selection that picks a sample with
`kind == "real"` runs the real binary via the chunked-upload path
(`exploits.driver._resolve_workload`).
Set `MALWAREBAZAAR_API_KEY` (free signup at https://bazaar.abuse.ch/)
before running `install-tier-3-4.sh` and step 5 runs

View file

@ -33,14 +33,6 @@ def main(argv: list[str] | None = None) -> int:
default=Path("/home/max/.env/wg-pki/issued"),
help="Where minted tarballs are cached.",
)
p.add_argument(
"--secrets-root",
type=Path,
default=Path("/etc/cis490/secrets"),
help="Directory holding shared secrets distributed to lab hosts. "
"Currently used for malwarebazaar.token; provisioned by "
"scripts/set-malwarebazaar-key.sh.",
)
p.add_argument("--log-level", default="info")
args = p.parse_args(argv)
@ -57,7 +49,6 @@ def main(argv: list[str] | None = None) -> int:
app = make_app(
issuer_script=args.issuer_script,
issued_root=args.issued_root,
secrets_root=args.secrets_root,
)
log.info("listening on %s:%d", args.listen_host, args.listen_port)
uvicorn.run(

View file

@ -61,7 +61,6 @@ def make_app(
*,
issuer_script: Path,
issued_root: Path,
secrets_root: Path = Path("/etc/cis490/secrets"),
rate_limit_window_s: float = 5.0,
) -> Starlette:
"""Build the Starlette app. Wired by the production launcher in
@ -140,45 +139,8 @@ def make_app(
},
)
async def get_secret(request: Request) -> Response:
"""Serve a named secret from `secrets_root`. Currently only
`malwarebazaar` is allowed the MB API key Tier-4 needs to
fetch real malware samples. Same trust boundary as the cert
endpoint: anything reaching bootstrap.wg has cleared
iptmonads' WG-membership check."""
name: str = request.path_params["name"]
# Strict allow-list to keep this from turning into a generic
# secrets API.
if name != "malwarebazaar":
return JSONResponse({"error": "unknown secret"}, status_code=404)
path = secrets_root / "malwarebazaar.token"
if not path.exists():
return JSONResponse(
{"error": "secret not provisioned",
"hint": "run scripts/set-malwarebazaar-key.sh on the receiver"},
status_code=404,
)
try:
data = path.read_text().strip()
except OSError as e:
return JSONResponse({"error": f"read failed: {e}"}, status_code=500)
if not data:
return JSONResponse({"error": "empty secret"}, status_code=500)
src = (
request.headers.get("x-real-ip")
or (request.headers.get("x-forwarded-for") or "").split(",")[0].strip()
or (request.client.host if request.client else "?")
)
log.info("served secret=%s to src=%s", name, src)
return Response(
content=data,
media_type="text/plain",
headers={"Cache-Control": "no-store"},
)
routes = [
Route("/v1/health", health, methods=["GET"]),
Route("/v1/cert/{host_id}", get_cert, methods=["GET"]),
Route("/v1/secret/{name}", get_secret, methods=["GET"]),
]
return Starlette(routes=routes)

View file

@ -16,8 +16,7 @@ ExecStart=/opt/cis490/.venv/bin/python -m bootstrap \
--listen-host 127.0.0.1 \
--listen-port 8446 \
--issuer-script /opt/wg-pki/scripts/issue-cis490-client-cert-wrapper.sh \
--issued-root /var/lib/wg-pki/issued \
--secrets-root /etc/cis490/secrets
--issued-root /var/lib/wg-pki/issued
Restart=on-failure
RestartSec=5

View file

@ -1,8 +1,6 @@
#!/usr/bin/env bash
# Tier-3 + Tier-4 deploy orchestrator. Idempotent. Zero operator
# interaction on the lab host (operator provisions the
# MalwareBazaar API key ONCE on the Pi via
# scripts/set-malwarebazaar-key.sh; from there it's auto-distributed).
# Tier-3 + Tier-4 deploy orchestrator. Idempotent. ZERO operator
# interaction — including no API key, no signup, no manual upload.
#
# Steps (each idempotent on its own):
# 1. install-msfrpcd.sh — auto-install metasploit-framework via
@ -14,22 +12,18 @@
# 4. Tier-3 verify — fire vsftpd_234_backdoor against the
# freshly-fetched VM, confirm session
# lands and an episode is recorded
# 5. Tier-4 deploy — fetch MalwareBazaar API key (env >
# local file > bootstrap.wg), then run
# auto_fetch_samples.py to pull one real
# binary per sample family. THIS IS NOT
# OPTIONAL — real-binary episodes are
# the actual training target. Deploy
# fails if zero samples land.
# 5. Tier-4 deploy — clone theZoo (public security-research
# repo, no auth), extract one real
# binary per manifest family, stage at
# samples/store/<sha256>, rewrite
# manifest.toml in place. MANDATORY:
# the deploy fails if zero samples land.
#
# Inputs (env, all optional):
# SKIP_VERIFY — set to skip the live Tier-3 fire test
# SKIP_BRIDGE — set to skip bridge setup (limits to non-callback modules)
# SKIP_TIER4 — set to skip Tier-4 deploy entirely (DEPRECATED;
# leaves you with mimic-only data, defeats the project)
# MALWAREBAZAAR_API_KEY — preferred input path; otherwise pulled
# from /opt/cis490/samples/.bazaar.token, then
# from https://bootstrap.wg/v1/secret/malwarebazaar
#
# Run as root from anywhere on the lab host. Sub-scripts handle their
# own root checks.
@ -123,55 +117,27 @@ else
log "[4/5] SKIP_VERIFY set"
fi
# --- 5. Tier-4 deploy (MANDATORY) --------------------------------------
# --- 5. Tier-4 deploy (MANDATORY, no auth required) --------------------
if [[ -n "${SKIP_TIER4:-}" ]]; then
log "[5/5] SKIP_TIER4 set — leaving this host on Tier 2/3 mimic-only."
log " This is NOT the recommended configuration; the project's"
log " training target is real-binary episodes."
else
log "[5/5] Tier-4 deploy (real malware fetch — mandatory)"
log "[5/5] Tier-4 deploy (real malware fetch from theZoo — mandatory)"
# Resolve the MalwareBazaar API key, in priority order:
# 1. MALWAREBAZAAR_API_KEY env (preferred for one-shot ops)
# 2. /opt/cis490/samples/.bazaar.token (already on disk)
# 3. https://bootstrap.wg/v1/secret/malwarebazaar (auto-distributed
# from the Pi after the operator runs set-malwarebazaar-key.sh)
MB_KEY="${MALWAREBAZAAR_API_KEY:-}"
TOKEN_FILE="$INSTALL_ROOT/samples/.bazaar.token"
command -v git >/dev/null || die "git not installed; need it to clone theZoo"
if [[ -z "$MB_KEY" && -f "$TOKEN_FILE" ]]; then
MB_KEY="$(cat "$TOKEN_FILE" | tr -d '[:space:]')"
log "using MB key from $TOKEN_FILE"
fi
if [[ -z "$MB_KEY" ]]; then
log "no local MB key — fetching from https://bootstrap.wg/v1/secret/malwarebazaar"
# Use the same Caddy root the cert auto-fetch trusts.
CADDY_ROOT="$INSTALL_ROOT/etc/caddy-root.crt"
[[ -f "$CADDY_ROOT" ]] || CADDY_ROOT="$REPO_ROOT/etc/caddy-root.crt"
if MB_KEY="$(curl -fsS \
--cacert "$CADDY_ROOT" \
--connect-timeout 10 --max-time 30 \
https://bootstrap.wg/v1/secret/malwarebazaar 2>/dev/null)"; then
MB_KEY="$(echo -n "$MB_KEY" | tr -d '[:space:]')"
install -d -o cis490 -g cis490 -m 0750 "$INSTALL_ROOT/samples"
install -m 0600 -o cis490 -g cis490 /dev/stdin "$TOKEN_FILE" <<<"$MB_KEY"
log "fetched MB key from bootstrap.wg + cached at $TOKEN_FILE"
else
die "could not fetch MB key from bootstrap.wg. Either:
- run on the Pi: sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh
(one-time per fleet; lab hosts auto-fetch after that), OR
- run on this host: MALWAREBAZAAR_API_KEY=<key> sudo $0
Get a free key at https://bazaar.abuse.ch/"
fi
fi
[[ -n "$MB_KEY" ]] || die "MB key still empty after all resolution paths"
log "running auto_fetch_samples.py — fetches one real binary per family"
PY="$INSTALL_ROOT/.venv/bin/python"
[[ -x "$PY" ]] || PY="$(command -v python3)"
if ! sudo -E MALWAREBAZAAR_API_KEY="$MB_KEY" -u cis490 "$PY" \
# theZoo clone lives on shared persistent storage so re-runs don't
# re-download. cis490 user owns it for periodic git pull.
THEZOO_DIR="${THEZOO_DIR:-/var/lib/cis490/theZoo}"
install -d -o cis490 -g cis490 -m 0755 "$(dirname "$THEZOO_DIR")"
if ! sudo -E -u cis490 "$PY" \
"$INSTALL_ROOT/tools/auto_fetch_samples.py" \
--thezoo-clone-dir "$THEZOO_DIR" \
> /tmp/cis490-tier4-deploy.log 2>&1; then
log "Tier-4 fetch failed — last 30 lines of /tmp/cis490-tier4-deploy.log:"
tail -30 /tmp/cis490-tier4-deploy.log >&2 || true

View file

@ -1,56 +0,0 @@
#!/usr/bin/env bash
# One-time operator step on the receiver Pi.
#
# Provisions the MalwareBazaar API key at /etc/cis490/secrets/malwarebazaar.token
# with mode 0640, owned by root:cis490 (the bootstrap service runs as root and
# reads this file directly; the cis490 user is included in the group so future
# rotations can be done without root).
#
# Once provisioned, every lab host that runs install-tier-3-4.sh fetches the
# key from https://bootstrap.wg/v1/secret/malwarebazaar (over WG, gated by
# iptmonads at L4) — operator does NOT need to repeat this on each lab host.
#
# Usage:
# sudo MALWAREBAZAAR_API_KEY=<key> /opt/cis490/scripts/set-malwarebazaar-key.sh
# or:
# echo $key | sudo /opt/cis490/scripts/set-malwarebazaar-key.sh
set -euo pipefail
SECRETS_DIR="${SECRETS_DIR:-/etc/cis490/secrets}"
KEY_FILE="$SECRETS_DIR/malwarebazaar.token"
log() { printf '[set-malwarebazaar-key] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
[[ $EUID -eq 0 ]] || die "must run as root"
# Accept the key via env var first, stdin second.
KEY="${MALWAREBAZAAR_API_KEY:-}"
if [[ -z "$KEY" ]] && [[ ! -t 0 ]]; then
KEY="$(cat)"
fi
KEY="$(echo -n "$KEY" | tr -d '[:space:]')"
[[ -n "$KEY" ]] || die "no key provided. Set MALWAREBAZAAR_API_KEY or pipe via stdin."
# Free signup at https://bazaar.abuse.ch/ — the key is a 64-char
# alphanumeric string. Loose sanity check.
[[ ${#KEY} -ge 32 ]] || die "key looks too short (${#KEY} chars). Get a real one from https://bazaar.abuse.ch/"
if ! id -u cis490 >/dev/null 2>&1; then
die "cis490 user not present — run install-receiver.sh first"
fi
install -d -o root -g cis490 -m 0750 "$SECRETS_DIR"
install -m 0640 -o root -g cis490 /dev/stdin "$KEY_FILE" <<<"$KEY"
log "key installed at $KEY_FILE (${#KEY} chars)"
log ""
log "Next step: each lab host's install-tier-3-4.sh will now fetch it"
log "automatically from https://bootstrap.wg/v1/secret/malwarebazaar"
log "during deploy. To force a re-fetch on an already-deployed host:"
log " ssh <lab-host> sudo rm /opt/cis490/samples/.bazaar.token"
log " ssh <lab-host> sudo /opt/cis490/scripts/install-tier-3-4.sh"
log ""
log "If the bootstrap service was running already, no restart needed —"
log "the secret endpoint reads the file fresh on each request."

View file

@ -1,80 +0,0 @@
"""Tests for the bootstrap.wg /v1/secret/<name> endpoint.
Tier 4 needs the MalwareBazaar API key on each lab host. We
distribute the key from the Pi via this endpoint instead of forcing
the operator to copy it manually to every host. Trust boundary is
identical to /v1/cert/<host>: a caller that reaches bootstrap.wg
is already a WG-mesh peer (iptmonads gate).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from starlette.testclient import TestClient
from bootstrap.app import make_app
@pytest.fixture
def bootstrap_app(tmp_path: Path):
issued_root = tmp_path / "issued"
issued_root.mkdir()
secrets_root = tmp_path / "secrets"
secrets_root.mkdir()
# Issuer script doesn't matter for these tests — make a no-op stub
# so make_app doesn't barf on a missing path.
stub = tmp_path / "stub.sh"
stub.write_text("#!/bin/sh\nexit 0\n")
stub.chmod(0o755)
app = make_app(
issuer_script=stub,
issued_root=issued_root,
secrets_root=secrets_root,
)
return app, secrets_root
def test_secret_404_when_not_provisioned(bootstrap_app):
app, _ = bootstrap_app
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 404
assert "secret not provisioned" in r.json()["error"]
def test_secret_returns_provisioned_token(bootstrap_app):
app, secrets_root = bootstrap_app
token = "a" * 64
(secrets_root / "malwarebazaar.token").write_text(token + "\n")
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 200
# Response is the bare token, no JSON wrapping (lab-host curls
# this and pipes straight into the install flow).
assert r.text.strip() == token
# Don't cache the secret in any intermediate proxy.
assert r.headers.get("cache-control") == "no-store"
def test_unknown_secret_name_404(bootstrap_app):
app, secrets_root = bootstrap_app
# Even if a file with that name existed on disk, the route's
# allow-list rejects anything but `malwarebazaar`.
(secrets_root / "anything-else.token").write_text("x")
with TestClient(app) as client:
r = client.get("/v1/secret/anything-else")
assert r.status_code == 404
assert "unknown secret" in r.json()["error"]
def test_empty_secret_500(bootstrap_app):
"""An empty token file is operator error — fail loudly so the
lab-host install doesn't end up calling MB with no key."""
app, secrets_root = bootstrap_app
(secrets_root / "malwarebazaar.token").write_text("")
with TestClient(app) as client:
r = client.get("/v1/secret/malwarebazaar")
assert r.status_code == 500
assert "empty" in r.json()["error"]

View file

@ -1,111 +1,151 @@
"""``cis490-auto-fetch-samples`` — pull one real binary per manifest
family from MalwareBazaar and update ``samples/manifest.toml``.
family from theZoo and update ``samples/manifest.toml``.
The selection is automatic: for each entry in ``samples/manifest.toml``
that doesn't already have a sha256, we query MalwareBazaar for a
recent sample whose ``signature`` field matches the entry's ``family``
(e.g. ``family = "XMRig"`` MB signature ``XMRig``). The first
result is downloaded via ``tools.fetch_sample.fetch_sample``, the
sha256 lands in ``samples/store/<sha256>``, and the manifest entry
gains ``source``, ``sha256``, and ``url`` fields.
No API key, no signup, no operator interaction. theZoo is a public
security-research repository (https://github.com/ytisf/theZoo)
maintained for malware analysis. Each sample is a password-protected
zip; the password is the well-known ``infected``. We clone the repo
once (~500 MB shallow), then for each manifest entry without a
sha256 we:
Idempotent: entries that already have a sha256 are skipped. Manifest
edits are atomic (tempfile + os.replace) and preserve the file's
ownership and mode.
1. Locate a directory in ``theZoo/malware/Binaries/`` matching
the entry's ``family`` (case-insensitive substring)
2. Find the .zip in that directory
3. Extract with password ``infected``
4. Pick the largest non-text payload as the binary
5. Compute its sha256, copy to ``samples/store/<sha256>``
6. Rewrite ``manifest.toml`` in place adding source/sha256/url
Run on the lab host as root (or as the cis490 service user, if it
has write permission to ``samples/``):
MALWAREBAZAAR_API_KEY=<key> \\
sudo -E -u cis490 /opt/cis490/.venv/bin/python \\
/opt/cis490/tools/auto_fetch_samples.py
Without an API key, exits 0 with no work done keeps the install
script's call site uncomplicated.
Idempotent: entries with sha256 already set are skipped. Manifest
edits are atomic (tempfile + os.replace, stat preserved). Families
that don't match anything in theZoo fail loudly so the deploy
script can decide whether to abort.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import shutil
import subprocess
import sys
import urllib.parse
import urllib.request
import zipfile
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
sys.path.insert(0, str(REPO_ROOT / "tools"))
from samples.manifest import SampleManifest # noqa: E402
# fetch_sample is a sibling tool — load via its module path.
import importlib.util # noqa: E402
_spec = importlib.util.spec_from_file_location(
"fetch_sample", REPO_ROOT / "tools" / "fetch_sample.py"
)
_fetch_sample = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_fetch_sample)
log = logging.getLogger("cis490.auto_fetch_samples")
MB_ENDPOINT = "https://mb-api.abuse.ch/api/v1/"
THEZOO_URL = "https://github.com/ytisf/theZoo.git"
THEZOO_PASSWORD = b"infected"
def query_mb_by_signature(signature: str, api_key: str, *, limit: int = 5,
timeout_s: float = 30.0) -> list[dict]:
"""Return up to ``limit`` recent MB samples whose signature matches.
Uses the ``get_siginfo`` query, which returns the latest samples
for a given Yara/community signature. Falls back to an empty list
on any error so the caller can move on to the next family."""
body = urllib.parse.urlencode({
"query": "get_siginfo",
"signature": signature,
"limit": str(limit),
}).encode()
req = urllib.request.Request(
MB_ENDPOINT, data=body,
headers={"Auth-Key": api_key},
def _ensure_thezoo(clone_dir: Path) -> Path:
"""Clone theZoo if missing; pull if present. Returns the clone path."""
if (clone_dir / ".git").exists():
log.info("theZoo already cloned at %s; pulling latest", clone_dir)
try:
subprocess.run(
["git", "-C", str(clone_dir), "pull", "--ff-only"],
check=True, capture_output=True, text=True, timeout=120,
)
except subprocess.CalledProcessError as e:
log.warning("git pull failed (using existing clone): %s",
e.stderr[:200])
return clone_dir
log.info("cloning %s%s (~500 MB shallow)", THEZOO_URL, clone_dir)
clone_dir.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", THEZOO_URL, str(clone_dir)],
check=True, timeout=600,
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as r:
payload = json.loads(r.read().decode("utf-8"))
except Exception as e:
log.warning("MB get_siginfo(%r) failed: %s", signature, e)
return []
if payload.get("query_status") != "ok":
log.warning("MB returned %r for signature %r",
payload.get("query_status"), signature)
return []
rows = payload.get("data") or []
return rows if isinstance(rows, list) else []
return clone_dir
def _find_family_dir(thezoo: Path, family: str) -> Path | None:
"""Locate a Binaries subdir whose name contains ``family``
(case-insensitive). theZoo's layout is
``malware/Binaries/<Family-Specific-Name>/``."""
binaries = thezoo / "malware" / "Binaries"
if not binaries.is_dir():
log.warning("theZoo layout missing %s — pull broke?", binaries)
return None
needle = family.lower()
matches: list[Path] = []
for child in sorted(binaries.iterdir()):
if not child.is_dir():
continue
if needle in child.name.lower():
matches.append(child)
if not matches:
return None
# Prefer exact-match prefix (e.g. "Mirai" before "MirageFox").
for m in matches:
if m.name.lower().startswith(needle):
return m
return matches[0]
def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None:
"""Extract the password-protected zip and return the path to the
largest payload that isn't an obvious text artifact (md5/sha256
sidecars, READMEs, license files)."""
work_dir.mkdir(parents=True, exist_ok=True)
candidates: list[tuple[int, Path]] = []
with zipfile.ZipFile(zip_path) as z:
try:
z.extractall(path=work_dir, pwd=THEZOO_PASSWORD)
except RuntimeError as e:
log.warning("extract %s failed: %s", zip_path.name, e)
return None
for f in work_dir.rglob("*"):
if not f.is_file():
continue
name = f.name.lower()
if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1",
".txt", ".md", ".pass")):
continue
if name in {"readme", "license", "metadata.txt"}:
continue
candidates.append((f.stat().st_size, f))
if not candidates:
return None
candidates.sort(reverse=True)
return candidates[0][1]
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def update_manifest_entry(manifest_path: Path, name: str,
source: str, sha256: str, url: str) -> None:
"""In-place add ``source`` / ``sha256`` / ``url`` to the entry
whose ``name`` matches. Preserves ownership and mode across the
tempfile-replace dance."""
"""Add `source`, `sha256`, `url` lines to the manifest entry whose
`name` matches. Atomic + stat-preserving."""
text = manifest_path.read_text()
needle = f'name = "{name}"'
idx = text.find(needle)
if idx < 0:
raise ValueError(f"name = {name!r} not found in {manifest_path}")
# Find the end of this [[sample]] block (next "[[" or EOF).
next_block = text.find("[[", idx + len(needle))
end = next_block if next_block != -1 else len(text)
block = text[idx:end]
# Skip if already has sha256.
if "sha256 =" in block and "TBD" not in block:
log.info("entry %s already has sha256; skipping", name)
if "sha256 =" in block:
log.info("entry %s already has sha256; skipping in-place edit", name)
return
# Insert the three new lines before the description (or at end).
insert = (
f'source = "{source}"\n'
f'sha256 = "{sha256}"\n'
@ -129,28 +169,66 @@ def update_manifest_entry(manifest_path: Path, name: str,
os.chmod(manifest_path, st.st_mode & 0o7777)
def fetch_one(thezoo: Path, sample_family: str, sample_name: str,
store_root: Path, work_root: Path) -> tuple[str, Path] | None:
"""Locate, extract, and stage one binary for a manifest family.
Returns (sha256, store_path) or None if the family wasn't found."""
fam_dir = _find_family_dir(thezoo, sample_family)
if fam_dir is None:
log.warning("%s: no theZoo dir matching family=%r", sample_name, sample_family)
return None
zips = sorted(fam_dir.rglob("*.zip"))
if not zips:
log.warning("%s: %s has no .zip — theZoo layout drift?",
sample_name, fam_dir)
return None
work_dir = work_root / sample_name
if work_dir.exists():
shutil.rmtree(work_dir)
binary = _extract_largest_binary(zips[0], work_dir)
if binary is None:
log.warning("%s: %s extraction yielded no payload",
sample_name, zips[0])
return None
sha = _sha256_of(binary)
store_root.mkdir(parents=True, exist_ok=True)
target = store_root / sha
if not target.exists():
shutil.copy2(binary, target)
log.info("%s: staged %s (%d bytes, sha256=%s)",
sample_name, target.name, target.stat().st_size, sha[:12])
# Best-effort: clean the per-sample work dir so disk doesn't grow.
shutil.rmtree(work_dir, ignore_errors=True)
return sha, target
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples")
p.add_argument("--manifest",
default=str(REPO_ROOT / "samples" / "manifest.toml"))
p.add_argument("--store-root",
default=str(REPO_ROOT / "samples" / "store"))
p.add_argument("--limit-per-family", type=int, default=1,
help="how many real binaries to fetch per family")
p.add_argument("--thezoo-clone-dir",
default="/var/lib/cis490/theZoo",
help="Where to (re)clone theZoo. Cached across runs.")
p.add_argument("--work-root",
default="/tmp/cis490-thezoo-extract",
help="Per-run extraction scratch dir.")
p.add_argument("--dry-run", action="store_true")
args = p.parse_args(argv)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
api_key = _fetch_sample._read_api_key(REPO_ROOT)
if not api_key:
log.warning("MALWAREBAZAAR_API_KEY not set — nothing to do")
return 0
if shutil.which("git") is None:
log.error("git not on PATH; install git first")
return 2
manifest_path = Path(args.manifest)
store_root = Path(args.store_root)
work_root = Path(args.work_root)
manifest = SampleManifest.load(manifest_path)
thezoo = _ensure_thezoo(Path(args.thezoo_clone_dir))
fetched = 0
skipped = 0
@ -161,42 +239,26 @@ def main(argv: list[str] | None = None) -> int:
sample.name, sample.sha256[:12])
skipped += 1
continue
log.info("%s: querying MB for family=%r", sample.name, sample.family)
rows = query_mb_by_signature(sample.family, api_key,
limit=args.limit_per_family)
if not rows:
log.warning("%s: no MB matches for family=%r — leaving as mimic",
sample.name, sample.family)
failed += 1
continue
# Pick the first non-corrupt-looking row that has a sha256.
chosen = next((r for r in rows if r.get("sha256_hash")), None)
if not chosen:
log.warning("%s: MB rows had no sha256_hash — skipping", sample.name)
failed += 1
continue
sha = chosen["sha256_hash"].lower()
url = f"https://bazaar.abuse.ch/sample/{sha}/"
if args.dry_run:
log.info("%s [dry-run]: would fetch %s", sample.name, sha)
fam = _find_family_dir(thezoo, sample.family)
log.info("%s [dry-run]: family=%s match=%s",
sample.name, sample.family, fam.name if fam else "<none>")
continue
try:
_fetch_sample.fetch_sample(sha, store_root, api_key)
update_manifest_entry(manifest_path, sample.name,
source="MalwareBazaar", sha256=sha, url=url)
log.info("%s: fetched + manifest updated (sha256=%s)",
sample.name, sha[:12])
fetched += 1
except Exception as e:
log.warning("%s: fetch failed: %s — leaving as mimic", sample.name, e)
result = fetch_one(thezoo, sample.family, sample.name,
store_root, work_root)
if result is None:
failed += 1
continue
sha, _ = result
url = f"https://github.com/ytisf/theZoo/tree/master/malware/Binaries"
update_manifest_entry(manifest_path, sample.name,
source="theZoo", sha256=sha, url=url)
fetched += 1
log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed)
# Tier 4 is mandatory — exit non-zero unless at least one real
# binary landed (or all entries were already real, i.e. nothing
# to do). The deploy script depends on this exit semantic.
# Tier 4 is mandatory — non-zero exit if no real samples staged.
if fetched == 0 and skipped == 0:
log.error("zero samples fetched and zero already-real — Tier 4 not viable")
log.error("zero samples staged — check theZoo clone + family-name mapping")
return 1
return 0