"""Fetch a malware sample by sha256 from MalwareBazaar. Lands the binary at ``samples/store/`` (gitignored), verifies the hash on the way in, and prints the resulting path on stdout. Usage: MALWAREBAZAAR_API_KEY=... uv run python tools/fetch_sample.py MalwareBazaar requires a free API key as of late 2023; sign up at https://bazaar.abuse.ch and either pass via env or place in ``samples/.bazaar.token`` (mode 0600, gitignored). The downloaded zip is unencrypted by ``infected`` per the MB convention. The fetcher is intentionally read-only over the network — no upload, no metadata posted — so a lab host with a tightly-egress-firewalled WG mesh can run it once on a build host and rsync the resulting ``samples/store/`` directory across the fleet. """ from __future__ import annotations import argparse import hashlib import os import sys import urllib.parse import urllib.request import zipfile from pathlib import Path MB_ENDPOINT = "https://mb-api.abuse.ch/api/v1/" MB_ZIP_PASSWORD = b"infected" def _read_api_key(repo_root: Path) -> str | None: env = os.environ.get("MALWAREBAZAAR_API_KEY") if env: return env.strip() token = repo_root / "samples" / ".bazaar.token" if token.exists(): return token.read_text().strip() return None def fetch_sample( sha256: str, out_dir: Path, api_key: str, *, timeout_s: float = 60.0, ) -> Path: if len(sha256) != 64 or not all(c in "0123456789abcdef" for c in sha256.lower()): raise ValueError(f"sha256 must be 64 hex chars, got {sha256!r}") sha256 = sha256.lower() out_dir.mkdir(parents=True, exist_ok=True) target = out_dir / sha256 if target.exists(): actual = hashlib.sha256(target.read_bytes()).hexdigest() if actual == sha256: return target target.unlink() # tampered or partial; refetch. body = urllib.parse.urlencode({ "query": "get_file", "sha256_hash": sha256, }).encode("utf-8") req = urllib.request.Request( MB_ENDPOINT, data=body, headers={ "Auth-Key": api_key, "User-Agent": "cis490-fetcher/0", }, method="POST", ) with urllib.request.urlopen(req, timeout=timeout_s) as r: payload = r.read() if not payload.startswith(b"PK"): raise RuntimeError( f"MalwareBazaar returned non-zip response (first 200 bytes): " f"{payload[:200]!r}" ) zip_path = out_dir / f"{sha256}.zip" zip_path.write_bytes(payload) try: with zipfile.ZipFile(zip_path) as zf: zf.setpassword(MB_ZIP_PASSWORD) names = zf.namelist() if not names: raise RuntimeError(f"{sha256}: empty zip") with zf.open(names[0]) as src, target.open("wb") as dst: dst.write(src.read()) finally: zip_path.unlink(missing_ok=True) actual = hashlib.sha256(target.read_bytes()).hexdigest() if actual != sha256: target.unlink() raise RuntimeError(f"sha256 mismatch: expected {sha256}, got {actual}") return target def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="fetch_sample") p.add_argument("sha256") p.add_argument( "--out-dir", type=Path, default=None, help="Where to drop (default: samples/store/ relative to repo)", ) args = p.parse_args(argv) repo_root = Path(__file__).resolve().parent.parent out_dir = args.out_dir or (repo_root / "samples" / "store") api_key = _read_api_key(repo_root) if not api_key: print( "no MalwareBazaar API key — set MALWAREBAZAAR_API_KEY or write " "samples/.bazaar.token (mode 0600). Register at " "https://bazaar.abuse.ch.", file=sys.stderr, ) return 2 try: path = fetch_sample(args.sha256, out_dir, api_key) except Exception as e: print(f"fetch failed: {e}", file=sys.stderr) return 1 print(path) return 0 if __name__ == "__main__": sys.exit(main())