"""``cis490-auto-fetch-samples`` — pull one real binary per manifest family from theZoo and update ``samples/manifest.toml``. No API key, no signup, no operator interaction. theZoo is a public security-research repository (https://github.com/ytisf/theZoo) maintained for malware analysis. Each sample is a password-protected zip; the password is the well-known ``infected``. We clone the repo once (~500 MB shallow), then for each manifest entry without a sha256 we: 1. Locate a directory in ``theZoo/malware/Binaries/`` matching the entry's ``family`` (case-insensitive substring) 2. Find the .zip in that directory 3. Extract with password ``infected`` 4. Pick the largest non-text payload as the binary 5. Compute its sha256, copy to ``samples/store/`` 6. Rewrite ``manifest.toml`` in place adding source/sha256/url Idempotent: entries with sha256 already set are skipped. Manifest edits are atomic (tempfile + os.replace, stat preserved). Families that don't match anything in theZoo fail loudly so the deploy script can decide whether to abort. """ from __future__ import annotations import argparse import hashlib import json import logging import os import shutil import subprocess import sys import zipfile from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) from samples.manifest import SampleManifest # noqa: E402 log = logging.getLogger("cis490.auto_fetch_samples") THEZOO_URL = "https://github.com/ytisf/theZoo.git" THEZOO_PASSWORD = b"infected" def _ensure_thezoo(clone_dir: Path) -> Path: """Clone theZoo if missing; pull if present. Returns the clone path.""" if (clone_dir / ".git").exists(): log.info("theZoo already cloned at %s; pulling latest", clone_dir) try: subprocess.run( ["git", "-C", str(clone_dir), "pull", "--ff-only"], check=True, capture_output=True, text=True, timeout=120, ) except subprocess.CalledProcessError as e: log.warning("git pull failed (using existing clone): %s", e.stderr[:200]) return clone_dir log.info("cloning %s → %s (~500 MB shallow)", THEZOO_URL, clone_dir) clone_dir.parent.mkdir(parents=True, exist_ok=True) subprocess.run( ["git", "clone", "--depth", "1", THEZOO_URL, str(clone_dir)], check=True, timeout=600, ) return clone_dir def _family_tokens(family: str) -> list[str]: """Split a manifest family name into search tokens. ``Cryptolocker-class`` → ``["cryptolocker-class", "cryptolocker"]`` so the search hits theZoo dirs like ``CryptoLocker_22Jan2014`` (which contain "cryptolocker" but not "-class"). ``Linux.Mirai.B`` → ``["linux.mirai.b", "linux"]`` — the literal-first-token will match the exact dir, the second is a fallback. Tokens are tried in order; the first matching dir wins.""" f = family.lower().strip() out: list[str] = [f] # Strip any "-suffix" / "_suffix" / ".suffix" the manifest uses for # clarity (e.g. "Cryptolocker-class" → also try "cryptolocker"; # "Linux.Mirai" → also try "linux.mirai" then fall back to # "linux"). All tokens are tried in order; first match wins. head_dash = f.split("-")[0] if head_dash != f: out.append(head_dash) head_dot = f.split(".")[0] if head_dot != f and head_dot not in out: out.append(head_dot) head_underscore = f.split("_")[0] if head_underscore != f and head_underscore not in out: out.append(head_underscore) return out def _find_family_dir(thezoo: Path, family: str) -> Path | None: """Locate a Binaries subdir matching ``family`` (case-insensitive substring). theZoo's layout is ``malware/Binaries//``. Two-pass match: first try the full lower-cased family, then strip suffixes like ``-class``/``-mimic`` and try the head token. We pick the prefix-match if there is one (so ``Mirai`` lands on ``Linux.Mirai.B`` rather than ``MirageFox``), otherwise the first substring match in alphabetical order.""" binaries = thezoo / "malware" / "Binaries" if not binaries.is_dir(): log.warning("theZoo layout missing %s — pull broke?", binaries) return None children = [c for c in sorted(binaries.iterdir()) if c.is_dir()] for needle in _family_tokens(family): matches = [c for c in children if needle in c.name.lower()] if not matches: continue # Prefer prefix match. for m in matches: if m.name.lower().startswith(needle): return m return matches[0] return None def _is_linux_i386_elf(path: Path) -> bool: """Check magic bytes for ELF 32-bit Intel 80386 (Metasploitable2's native arch). Pure stdlib so we don't depend on `file`.""" try: with path.open("rb") as f: head = f.read(20) except OSError: return False if len(head) < 20 or head[:4] != b"\x7fELF": return False # ei_class = 1 (32-bit), ei_data = 1 (little-endian), e_machine # at offset 18 = 0x03 for i386. ei_osabi at offset 7 == 0 (SYSV) # OR 3 (Linux). FreeBSD is 9 — exclude. if head[4] != 1 or head[5] != 1: return False if head[7] not in (0, 3): # SYSV or Linux return False e_machine = int.from_bytes(head[18:20], "little") return e_machine == 0x03 # EM_386 def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None: """Extract the password-protected zip and return the best payload: 1. The largest **Linux i386 ELF** in the archive (prefers binaries that will actually execute inside Metasploitable2). 2. Any other ELF (some samples are ARM/x86-64; the chunked uploader will land them but execution is best-effort). 3. The largest non-text file (last-resort fallback for Windows-PE-only archives, in case Wine is on the target). Filters out obvious sidecars (md5/sha256/passwords/readmes).""" work_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path) as z: try: z.extractall(path=work_dir, pwd=THEZOO_PASSWORD) except RuntimeError as e: log.warning("extract %s failed: %s", zip_path.name, e) return None payloads: list[Path] = [] for f in work_dir.rglob("*"): if not f.is_file(): continue name = f.name.lower() if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1", ".txt", ".md", ".pass", ".c", ".bat", ".sln", ".vcproj")): continue if name in {"readme", "license", "metadata.txt"}: continue payloads.append(f) if not payloads: return None # Tier 1: Linux i386 ELF, largest first. linux_i386 = sorted( (p for p in payloads if _is_linux_i386_elf(p)), key=lambda p: p.stat().st_size, reverse=True, ) if linux_i386: return linux_i386[0] # Tier 2: any ELF (best-effort — chunked upload still works, # the binary may fail to execute inside the target VM but the # episode records the attempt). def _is_elf(p: Path) -> bool: try: with p.open("rb") as f: return f.read(4) == b"\x7fELF" except OSError: return False other_elf = sorted( (p for p in payloads if _is_elf(p)), key=lambda p: p.stat().st_size, reverse=True, ) if other_elf: log.warning("%s: no Linux i386 ELF found; falling back to %s " "(may not execute on Metasploitable2)", zip_path.name, other_elf[0].name) return other_elf[0] # Tier 3: largest non-text payload (Windows PE etc.). log.warning("%s: no ELF found; falling back to largest non-text payload", zip_path.name) return max(payloads, key=lambda p: p.stat().st_size) def _sha256_of(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def update_manifest_entry(manifest_path: Path, name: str, source: str, sha256: str, url: str) -> None: """Add `source`, `sha256`, `url` lines to the manifest entry whose `name` matches. Atomic + stat-preserving.""" text = manifest_path.read_text() needle = f'name = "{name}"' idx = text.find(needle) if idx < 0: raise ValueError(f"name = {name!r} not found in {manifest_path}") next_block = text.find("[[", idx + len(needle)) end = next_block if next_block != -1 else len(text) block = text[idx:end] if "sha256 =" in block: log.info("entry %s already has sha256; skipping in-place edit", name) return insert = ( f'source = "{source}"\n' f'sha256 = "{sha256}"\n' f'url = "{url}"\n' ) desc_idx = block.find("description = ") if desc_idx >= 0: new_block = block[:desc_idx] + insert + block[desc_idx:] else: new_block = block.rstrip() + "\n" + insert + "\n" new_text = text[:idx] + new_block + text[end:] st = manifest_path.stat() tmp = manifest_path.with_suffix(".toml.partial") tmp.write_text(new_text) os.replace(tmp, manifest_path) try: os.chown(manifest_path, st.st_uid, st.st_gid) except (PermissionError, OSError): pass os.chmod(manifest_path, st.st_mode & 0o7777) def fetch_one(thezoo: Path, sample_family: str, sample_name: str, store_root: Path, work_root: Path) -> tuple[str, Path] | None: """Locate, extract, and stage one binary for a manifest family. Returns (sha256, store_path) or None if the family wasn't found.""" fam_dir = _find_family_dir(thezoo, sample_family) if fam_dir is None: log.warning("%s: no theZoo dir matching family=%r", sample_name, sample_family) return None zips = sorted(fam_dir.rglob("*.zip")) if not zips: log.warning("%s: %s has no .zip — theZoo layout drift?", sample_name, fam_dir) return None work_dir = work_root / sample_name if work_dir.exists(): shutil.rmtree(work_dir) binary = _extract_largest_binary(zips[0], work_dir) if binary is None: log.warning("%s: %s extraction yielded no payload", sample_name, zips[0]) return None sha = _sha256_of(binary) store_root.mkdir(parents=True, exist_ok=True) target = store_root / sha if not target.exists(): shutil.copy2(binary, target) log.info("%s: staged %s (%d bytes, sha256=%s)", sample_name, target.name, target.stat().st_size, sha[:12]) # Best-effort: clean the per-sample work dir so disk doesn't grow. shutil.rmtree(work_dir, ignore_errors=True) return sha, target def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples") p.add_argument("--manifest", default=str(REPO_ROOT / "samples" / "manifest.toml")) p.add_argument("--store-root", default=str(REPO_ROOT / "samples" / "store")) p.add_argument("--thezoo-clone-dir", default="/var/lib/cis490/theZoo", help="Where to (re)clone theZoo. Cached across runs.") p.add_argument("--work-root", default="/tmp/cis490-thezoo-extract", help="Per-run extraction scratch dir.") p.add_argument("--dry-run", action="store_true") args = p.parse_args(argv) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") if shutil.which("git") is None: log.error("git not on PATH; install git first") return 2 manifest_path = Path(args.manifest) store_root = Path(args.store_root) work_root = Path(args.work_root) manifest = SampleManifest.load(manifest_path) thezoo = _ensure_thezoo(Path(args.thezoo_clone_dir)) fetched = 0 skipped = 0 failed = 0 for sample in manifest.samples: if sample.sha256: log.info("%s: already real (sha256=%s); skipping", sample.name, sample.sha256[:12]) skipped += 1 continue if args.dry_run: fam = _find_family_dir(thezoo, sample.family) log.info("%s [dry-run]: family=%s match=%s", sample.name, sample.family, fam.name if fam else "") continue result = fetch_one(thezoo, sample.family, sample.name, store_root, work_root) if result is None: failed += 1 continue sha, _ = result url = f"https://github.com/ytisf/theZoo/tree/master/malware/Binaries" update_manifest_entry(manifest_path, sample.name, source="theZoo", sha256=sha, url=url) fetched += 1 log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed) # Tier 4 is mandatory — non-zero exit if no real samples staged. if fetched == 0 and skipped == 0: log.error("zero samples staged — check theZoo clone + family-name mapping") return 1 return 0 if __name__ == "__main__": sys.exit(main())