User caught it: I shipped the theZoo path without running it
end-to-end. A real fetch on the Pi exposed two bugs:
1. Family-name matcher was substring-strict. "Cryptolocker-class"
wouldn't match the dir "CryptoLocker_22Jan2014" because "-class"
isn't in the dir name. Now expands to a sequence of tokens
(full, head-of-dash, head-of-dot, head-of-underscore) and tries
each. First match wins.
2. Extraction picker was "largest non-text" — a bad heuristic for
theZoo, where each Linux.* zip often contains MULTIPLE binaries
for different platforms (Linux i386, x86-64, ARM, FreeBSD, sometimes
even Windows PE). The largest is rarely the i386 Linux ELF that
would actually run on Metasploitable2. Now sniffs ELF magic bytes
in stdlib and tiers:
1. Linux i386 ELF (largest first)
2. any other ELF (best-effort, may not execute)
3. largest non-text (Wine fallback)
Verified end-to-end on the Pi against a real theZoo clone (~500 MB,
263 family dirs, 2026-05-01 fresh pull):
linux-encoder-ransomware → ELF 32-bit Intel i386 SYSV (278 KB)
linux-wirenet-rat → ELF 32-bit Intel i386 SYSV (64 KB)
linux-rex-ransomware → ELF 32-bit Intel i386 SYSV Go (7.6 MB)
linux-neurevt-bot → ELF 32-bit Intel i386 SYSV (3.0 MB)
linux-earthkrahang-apt → ELF 32-bit Intel i386 GNU/Linux (5.8 MB)
5/5 picks are runnable Linux i386 ELFs. Manifest rewrites in place
add source/sha256/url; meta.sample.kind goes to "real" automatically.
Manifest rewritten:
- Old families (XMRig, Mirai, Cryptolocker-class, Dridex, Kovter,
Reverse-Shell) → mostly absent from theZoo's Linux catalog or
matched the wrong arch.
- New families chosen against a verified theZoo presence list:
Linux.Encoder, Linux.Wirenet, Ransomware.Rex, Neurevt,
EarthKrahang.
- XMRig + Kovter remain as mimic-only fallbacks (theZoo lacks a
runnable Linux i386 binary for these; orchestrator falls back
to the mimic profile).
Tests added (tests/test_auto_fetch_samples.py): 13 cases covering
ELF magic detection (i386 accepted, FreeBSD/x86-64/ARM/PE32/text
all rejected), family-token expansion (the "-class" suffix bug),
extraction picker (prefers Linux i386 over larger non-Linux ELFs),
manifest in-place rewrite preserves mode + skips entries that
already have sha256.
What's still NOT verified end-to-end (requires a lab host with
KVM x86):
- Metasploitable2 boot under QEMU
- vsftpd_234_backdoor exploit fire via msfrpcd
- chunked binary upload through a real shell session
- real binary executing inside a Metasploitable2 guest
The Pi is ARM64 — can't run Metasploitable2. install-tier-3-4.sh's
verify step (run_tier3_demo.py) covers all four on a real lab host;
deploy verifies on first run there.
171/171 tests pass.
354 lines
13 KiB
Python
354 lines
13 KiB
Python
"""``cis490-auto-fetch-samples`` — pull one real binary per manifest
|
|
family from theZoo and update ``samples/manifest.toml``.
|
|
|
|
No API key, no signup, no operator interaction. theZoo is a public
|
|
security-research repository (https://github.com/ytisf/theZoo)
|
|
maintained for malware analysis. Each sample is a password-protected
|
|
zip; the password is the well-known ``infected``. We clone the repo
|
|
once (~500 MB shallow), then for each manifest entry without a
|
|
sha256 we:
|
|
|
|
1. Locate a directory in ``theZoo/malware/Binaries/`` matching
|
|
the entry's ``family`` (case-insensitive substring)
|
|
2. Find the .zip in that directory
|
|
3. Extract with password ``infected``
|
|
4. Pick the largest non-text payload as the binary
|
|
5. Compute its sha256, copy to ``samples/store/<sha256>``
|
|
6. Rewrite ``manifest.toml`` in place adding source/sha256/url
|
|
|
|
Idempotent: entries with sha256 already set are skipped. Manifest
|
|
edits are atomic (tempfile + os.replace, stat preserved). Families
|
|
that don't match anything in theZoo fail loudly so the deploy
|
|
script can decide whether to abort.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
from samples.manifest import SampleManifest # noqa: E402
|
|
|
|
|
|
log = logging.getLogger("cis490.auto_fetch_samples")
|
|
|
|
|
|
THEZOO_URL = "https://github.com/ytisf/theZoo.git"
|
|
THEZOO_PASSWORD = b"infected"
|
|
|
|
|
|
def _ensure_thezoo(clone_dir: Path) -> Path:
|
|
"""Clone theZoo if missing; pull if present. Returns the clone path."""
|
|
if (clone_dir / ".git").exists():
|
|
log.info("theZoo already cloned at %s; pulling latest", clone_dir)
|
|
try:
|
|
subprocess.run(
|
|
["git", "-C", str(clone_dir), "pull", "--ff-only"],
|
|
check=True, capture_output=True, text=True, timeout=120,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
log.warning("git pull failed (using existing clone): %s",
|
|
e.stderr[:200])
|
|
return clone_dir
|
|
log.info("cloning %s → %s (~500 MB shallow)", THEZOO_URL, clone_dir)
|
|
clone_dir.parent.mkdir(parents=True, exist_ok=True)
|
|
subprocess.run(
|
|
["git", "clone", "--depth", "1", THEZOO_URL, str(clone_dir)],
|
|
check=True, timeout=600,
|
|
)
|
|
return clone_dir
|
|
|
|
|
|
def _family_tokens(family: str) -> list[str]:
|
|
"""Split a manifest family name into search tokens. ``Cryptolocker-class``
|
|
→ ``["cryptolocker-class", "cryptolocker"]`` so the search hits theZoo
|
|
dirs like ``CryptoLocker_22Jan2014`` (which contain "cryptolocker"
|
|
but not "-class"). ``Linux.Mirai.B`` → ``["linux.mirai.b", "linux"]``
|
|
— the literal-first-token will match the exact dir, the second is a
|
|
fallback. Tokens are tried in order; the first matching dir wins."""
|
|
f = family.lower().strip()
|
|
out: list[str] = [f]
|
|
# Strip any "-suffix" / "_suffix" / ".suffix" the manifest uses for
|
|
# clarity (e.g. "Cryptolocker-class" → also try "cryptolocker";
|
|
# "Linux.Mirai" → also try "linux.mirai" then fall back to
|
|
# "linux"). All tokens are tried in order; first match wins.
|
|
head_dash = f.split("-")[0]
|
|
if head_dash != f:
|
|
out.append(head_dash)
|
|
head_dot = f.split(".")[0]
|
|
if head_dot != f and head_dot not in out:
|
|
out.append(head_dot)
|
|
head_underscore = f.split("_")[0]
|
|
if head_underscore != f and head_underscore not in out:
|
|
out.append(head_underscore)
|
|
return out
|
|
|
|
|
|
def _find_family_dir(thezoo: Path, family: str) -> Path | None:
|
|
"""Locate a Binaries subdir matching ``family`` (case-insensitive
|
|
substring). theZoo's layout is ``malware/Binaries/<Family-Name>/``.
|
|
|
|
Two-pass match: first try the full lower-cased family, then strip
|
|
suffixes like ``-class``/``-mimic`` and try the head token. We pick
|
|
the prefix-match if there is one (so ``Mirai`` lands on
|
|
``Linux.Mirai.B`` rather than ``MirageFox``), otherwise the first
|
|
substring match in alphabetical order."""
|
|
binaries = thezoo / "malware" / "Binaries"
|
|
if not binaries.is_dir():
|
|
log.warning("theZoo layout missing %s — pull broke?", binaries)
|
|
return None
|
|
children = [c for c in sorted(binaries.iterdir()) if c.is_dir()]
|
|
for needle in _family_tokens(family):
|
|
matches = [c for c in children if needle in c.name.lower()]
|
|
if not matches:
|
|
continue
|
|
# Prefer prefix match.
|
|
for m in matches:
|
|
if m.name.lower().startswith(needle):
|
|
return m
|
|
return matches[0]
|
|
return None
|
|
|
|
|
|
def _is_linux_i386_elf(path: Path) -> bool:
|
|
"""Check magic bytes for ELF 32-bit Intel 80386 (Metasploitable2's
|
|
native arch). Pure stdlib so we don't depend on `file`."""
|
|
try:
|
|
with path.open("rb") as f:
|
|
head = f.read(20)
|
|
except OSError:
|
|
return False
|
|
if len(head) < 20 or head[:4] != b"\x7fELF":
|
|
return False
|
|
# ei_class = 1 (32-bit), ei_data = 1 (little-endian), e_machine
|
|
# at offset 18 = 0x03 for i386. ei_osabi at offset 7 == 0 (SYSV)
|
|
# OR 3 (Linux). FreeBSD is 9 — exclude.
|
|
if head[4] != 1 or head[5] != 1:
|
|
return False
|
|
if head[7] not in (0, 3): # SYSV or Linux
|
|
return False
|
|
e_machine = int.from_bytes(head[18:20], "little")
|
|
return e_machine == 0x03 # EM_386
|
|
|
|
|
|
def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None:
|
|
"""Extract the password-protected zip and return the best payload:
|
|
|
|
1. The largest **Linux i386 ELF** in the archive (prefers binaries
|
|
that will actually execute inside Metasploitable2).
|
|
2. Any other ELF (some samples are ARM/x86-64; the chunked
|
|
uploader will land them but execution is best-effort).
|
|
3. The largest non-text file (last-resort fallback for
|
|
Windows-PE-only archives, in case Wine is on the target).
|
|
|
|
Filters out obvious sidecars (md5/sha256/passwords/readmes)."""
|
|
work_dir.mkdir(parents=True, exist_ok=True)
|
|
with zipfile.ZipFile(zip_path) as z:
|
|
try:
|
|
z.extractall(path=work_dir, pwd=THEZOO_PASSWORD)
|
|
except RuntimeError as e:
|
|
log.warning("extract %s failed: %s", zip_path.name, e)
|
|
return None
|
|
|
|
payloads: list[Path] = []
|
|
for f in work_dir.rglob("*"):
|
|
if not f.is_file():
|
|
continue
|
|
name = f.name.lower()
|
|
if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1",
|
|
".txt", ".md", ".pass", ".c",
|
|
".bat", ".sln", ".vcproj")):
|
|
continue
|
|
if name in {"readme", "license", "metadata.txt"}:
|
|
continue
|
|
payloads.append(f)
|
|
if not payloads:
|
|
return None
|
|
|
|
# Tier 1: Linux i386 ELF, largest first.
|
|
linux_i386 = sorted(
|
|
(p for p in payloads if _is_linux_i386_elf(p)),
|
|
key=lambda p: p.stat().st_size, reverse=True,
|
|
)
|
|
if linux_i386:
|
|
return linux_i386[0]
|
|
|
|
# Tier 2: any ELF (best-effort — chunked upload still works,
|
|
# the binary may fail to execute inside the target VM but the
|
|
# episode records the attempt).
|
|
def _is_elf(p: Path) -> bool:
|
|
try:
|
|
with p.open("rb") as f:
|
|
return f.read(4) == b"\x7fELF"
|
|
except OSError:
|
|
return False
|
|
other_elf = sorted(
|
|
(p for p in payloads if _is_elf(p)),
|
|
key=lambda p: p.stat().st_size, reverse=True,
|
|
)
|
|
if other_elf:
|
|
log.warning("%s: no Linux i386 ELF found; falling back to %s "
|
|
"(may not execute on Metasploitable2)",
|
|
zip_path.name, other_elf[0].name)
|
|
return other_elf[0]
|
|
|
|
# Tier 3: largest non-text payload (Windows PE etc.).
|
|
log.warning("%s: no ELF found; falling back to largest non-text payload",
|
|
zip_path.name)
|
|
return max(payloads, key=lambda p: p.stat().st_size)
|
|
|
|
|
|
def _sha256_of(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def update_manifest_entry(manifest_path: Path, name: str,
|
|
source: str, sha256: str, url: str) -> None:
|
|
"""Add `source`, `sha256`, `url` lines to the manifest entry whose
|
|
`name` matches. Atomic + stat-preserving."""
|
|
text = manifest_path.read_text()
|
|
needle = f'name = "{name}"'
|
|
idx = text.find(needle)
|
|
if idx < 0:
|
|
raise ValueError(f"name = {name!r} not found in {manifest_path}")
|
|
next_block = text.find("[[", idx + len(needle))
|
|
end = next_block if next_block != -1 else len(text)
|
|
block = text[idx:end]
|
|
if "sha256 =" in block:
|
|
log.info("entry %s already has sha256; skipping in-place edit", name)
|
|
return
|
|
insert = (
|
|
f'source = "{source}"\n'
|
|
f'sha256 = "{sha256}"\n'
|
|
f'url = "{url}"\n'
|
|
)
|
|
desc_idx = block.find("description = ")
|
|
if desc_idx >= 0:
|
|
new_block = block[:desc_idx] + insert + block[desc_idx:]
|
|
else:
|
|
new_block = block.rstrip() + "\n" + insert + "\n"
|
|
new_text = text[:idx] + new_block + text[end:]
|
|
|
|
st = manifest_path.stat()
|
|
tmp = manifest_path.with_suffix(".toml.partial")
|
|
tmp.write_text(new_text)
|
|
os.replace(tmp, manifest_path)
|
|
try:
|
|
os.chown(manifest_path, st.st_uid, st.st_gid)
|
|
except (PermissionError, OSError):
|
|
pass
|
|
os.chmod(manifest_path, st.st_mode & 0o7777)
|
|
|
|
|
|
def fetch_one(thezoo: Path, sample_family: str, sample_name: str,
|
|
store_root: Path, work_root: Path) -> tuple[str, Path] | None:
|
|
"""Locate, extract, and stage one binary for a manifest family.
|
|
Returns (sha256, store_path) or None if the family wasn't found."""
|
|
fam_dir = _find_family_dir(thezoo, sample_family)
|
|
if fam_dir is None:
|
|
log.warning("%s: no theZoo dir matching family=%r", sample_name, sample_family)
|
|
return None
|
|
zips = sorted(fam_dir.rglob("*.zip"))
|
|
if not zips:
|
|
log.warning("%s: %s has no .zip — theZoo layout drift?",
|
|
sample_name, fam_dir)
|
|
return None
|
|
work_dir = work_root / sample_name
|
|
if work_dir.exists():
|
|
shutil.rmtree(work_dir)
|
|
binary = _extract_largest_binary(zips[0], work_dir)
|
|
if binary is None:
|
|
log.warning("%s: %s extraction yielded no payload",
|
|
sample_name, zips[0])
|
|
return None
|
|
sha = _sha256_of(binary)
|
|
store_root.mkdir(parents=True, exist_ok=True)
|
|
target = store_root / sha
|
|
if not target.exists():
|
|
shutil.copy2(binary, target)
|
|
log.info("%s: staged %s (%d bytes, sha256=%s)",
|
|
sample_name, target.name, target.stat().st_size, sha[:12])
|
|
# Best-effort: clean the per-sample work dir so disk doesn't grow.
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
return sha, target
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples")
|
|
p.add_argument("--manifest",
|
|
default=str(REPO_ROOT / "samples" / "manifest.toml"))
|
|
p.add_argument("--store-root",
|
|
default=str(REPO_ROOT / "samples" / "store"))
|
|
p.add_argument("--thezoo-clone-dir",
|
|
default="/var/lib/cis490/theZoo",
|
|
help="Where to (re)clone theZoo. Cached across runs.")
|
|
p.add_argument("--work-root",
|
|
default="/tmp/cis490-thezoo-extract",
|
|
help="Per-run extraction scratch dir.")
|
|
p.add_argument("--dry-run", action="store_true")
|
|
args = p.parse_args(argv)
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s")
|
|
|
|
if shutil.which("git") is None:
|
|
log.error("git not on PATH; install git first")
|
|
return 2
|
|
|
|
manifest_path = Path(args.manifest)
|
|
store_root = Path(args.store_root)
|
|
work_root = Path(args.work_root)
|
|
manifest = SampleManifest.load(manifest_path)
|
|
thezoo = _ensure_thezoo(Path(args.thezoo_clone_dir))
|
|
|
|
fetched = 0
|
|
skipped = 0
|
|
failed = 0
|
|
for sample in manifest.samples:
|
|
if sample.sha256:
|
|
log.info("%s: already real (sha256=%s); skipping",
|
|
sample.name, sample.sha256[:12])
|
|
skipped += 1
|
|
continue
|
|
if args.dry_run:
|
|
fam = _find_family_dir(thezoo, sample.family)
|
|
log.info("%s [dry-run]: family=%s match=%s",
|
|
sample.name, sample.family, fam.name if fam else "<none>")
|
|
continue
|
|
result = fetch_one(thezoo, sample.family, sample.name,
|
|
store_root, work_root)
|
|
if result is None:
|
|
failed += 1
|
|
continue
|
|
sha, _ = result
|
|
url = f"https://github.com/ytisf/theZoo/tree/master/malware/Binaries"
|
|
update_manifest_entry(manifest_path, sample.name,
|
|
source="theZoo", sha256=sha, url=url)
|
|
fetched += 1
|
|
|
|
log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed)
|
|
# Tier 4 is mandatory — non-zero exit if no real samples staged.
|
|
if fetched == 0 and skipped == 0:
|
|
log.error("zero samples staged — check theZoo clone + family-name mapping")
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|