diff --git a/samples/manifest.toml b/samples/manifest.toml index 669898b..82b0018 100644 --- a/samples/manifest.toml +++ b/samples/manifest.toml @@ -4,67 +4,74 @@ # - identity (name, family, category) for labeling # - acquisition (source, sha256, url) for reproducibility # - behaviour (profile) so the synthetic load mimic can run a -# reasonable proxy until the real sample lands at vm/images/ +# reasonable proxy until the real sample lands at samples/store/. # # When the real malware binary is present at samples/store/, # the orchestrator runs THAT inside the guest. When it's absent, the -# orchestrator falls back to running tools/load_mimic.py with the -# matching profile so the fleet still produces *labeled, varied* data -# while we collect the real samples. Either way, meta.json records -# which path the episode took, so trainers can filter on +# orchestrator falls back to the mimic workload with the matching +# profile so the fleet still produces *labeled, varied* data while +# we collect the real samples. Either way, meta.json records which +# path the episode took, so trainers can filter on # meta.sample.kind ∈ {real, mimic}. +# +# Families below are CHOSEN AND TESTED to match theZoo entries that +# contain a Linux 32-bit Intel 80386 ELF binary — i.e. binaries that +# will execute natively inside our Metasploitable2 (Ubuntu 8.04 i386) +# target VM. Verified against a fresh theZoo clone on 2026-05-01; +# tools/auto_fetch_samples.py prefers the Linux-i386 ELF in each +# multi-binary zip via `_is_linux_i386_elf` magic-byte sniffing. +[[sample]] +name = "linux-encoder-ransomware" +family = "Linux.Encoder" +category = "ransomware" +profile = "io-walk" +description = "Linux.Encoder.1 (Linux i386 ELF). The first known Linux ransomware. Heavy disk write + fs walk producing a per-file overwrite envelope." + +[[sample]] +name = "linux-wirenet-rat" +family = "Linux.Wirenet" +category = "rat" +profile = "shell-resident" +description = "Linux.Wirenet (Linux i386 ELF). RAT with a long-lived TCP socket pinned to a fixed peer; occasional command bursts." + +[[sample]] +name = "linux-rex-ransomware" +family = "Ransomware.Rex" +category = "ransomware" +profile = "io-walk" +description = "Ransomware.Rex (Linux i386 ELF, written in Go). File-walk encryption envelope with periodic CPU spikes during AES." + +[[sample]] +name = "linux-neurevt-bot" +family = "Neurevt" +category = "botnet" +profile = "scan-and-dial" +description = "Neurevt 1.7 (Linux i386 ELF). Botnet panel binary; SYN scans + periodic dial-home pattern." + +[[sample]] +name = "linux-earthkrahang-apt" +family = "EarthKrahang" +category = "rat" +profile = "bursty-c2" +description = "EarthKrahang 2024 (Linux i386 ELF). APT backdoor; long idle + periodic small TCP egress bursts." + +# Mimic-only fallback families. theZoo doesn't have a clean Linux i386 +# binary for these; auto_fetch_samples.py logs a warning and the +# orchestrator stays on the mimic workload until a real binary is +# staged manually at samples/store/. Kept here so the trainer +# can still collect cpu-saturate and low-and-slow envelopes (those +# profiles' theZoo coverage is sparse). [[sample]] name = "xmrig-cryptominer" family = "XMRig" category = "cryptominer" profile = "cpu-saturate" -description = "Sustained 1-vCPU saturation, very low IO/net. Pure compute." -# To promote this entry to Tier-4 (real binary): -# 1. Pick a sha256 from https://bazaar.abuse.ch/ for this family. -# 2. Add `source`, `sha256`, `url` fields below. -# 3. On the lab host (one-time per host): -# export MALWAREBAZAAR_API_KEY= -# sudo -u cis490 /opt/cis490/.venv/bin/python \ -# /opt/cis490/tools/fetch_sample.py -# The sha256 is verified on download; the binary lands at -# /opt/cis490/samples/store/. -# 4. Restart cis490-orchestrator. Episodes that select this sample -# now run the real binary via the chunked upload path. If the -# binary isn't on disk, the orchestrator falls back to the mimic -# profile above — both kinds coexist via meta.sample.kind. - -[[sample]] -name = "mirai-class-bot" -family = "Mirai" -category = "botnet" -profile = "scan-and-dial" -description = "SYN scans across the bridge IP space + periodic dial-home. High net, low CPU." - -[[sample]] -name = "ransomware-mimic" -family = "Cryptolocker-class" -category = "ransomware" -profile = "io-walk" -description = "Heavy disk write + filesystem walk producing a per-file overwrite envelope." - -[[sample]] -name = "dridex-class-trojan" -family = "Dridex" -category = "banking-trojan" -profile = "bursty-c2" -description = "Long idle, periodic short bursts of TCP egress to a fixed peer (C2 beacon shape)." +description = "Mimic only on Metasploitable2 (no Linux-i386 XMRig in theZoo)." [[sample]] name = "kovter-class-stealth" family = "Kovter" category = "fileless" profile = "low-and-slow" -description = "Low CPU, periodic memory churn, no persistent on-disk artifacts. Hardest to label from /proc alone." - -[[sample]] -name = "reverse-shell-resident" -family = "Reverse-Shell" -category = "rat" -profile = "shell-resident" -description = "Single TCP socket pinned to an attacker IP, occasional command bursts." +description = "Mimic only — Kovter is Windows-native; theZoo's binary won't run on Metasploitable2 i386." diff --git a/tests/test_auto_fetch_samples.py b/tests/test_auto_fetch_samples.py new file mode 100644 index 0000000..2616e92 --- /dev/null +++ b/tests/test_auto_fetch_samples.py @@ -0,0 +1,200 @@ +"""Tests for tools/auto_fetch_samples.py. + +Exercises the parts that can be tested without a real theZoo clone: + - ELF magic-byte sniffing for Linux i386 detection + - family-name → directory matching (substring + token fallback) + - manifest in-place rewrite (atomic, stat-preserving) +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parent.parent +spec = importlib.util.spec_from_file_location( + "auto_fetch_samples", REPO_ROOT / "tools" / "auto_fetch_samples.py" +) +afs = importlib.util.module_from_spec(spec) +sys.modules["auto_fetch_samples"] = afs +spec.loader.exec_module(afs) + + +# --------------------------------------------------------------------------- +# ELF magic detection +# --------------------------------------------------------------------------- + + +def _write(p: Path, data: bytes) -> Path: + p.parent.mkdir(parents=True, exist_ok=True) + p.write_bytes(data) + return p + + +def _elf_header(*, ei_class: int = 1, ei_data: int = 1, ei_osabi: int = 0, + e_machine: int = 0x03) -> bytes: + """Synthesise a minimal ELF header. Default = Linux i386.""" + h = bytearray(20) + h[:4] = b"\x7fELF" + h[4] = ei_class # 1=32, 2=64 + h[5] = ei_data # 1=little, 2=big + h[6] = 1 # ei_version + h[7] = ei_osabi # 0=SYSV, 3=Linux, 9=FreeBSD + h[18:20] = e_machine.to_bytes(2, "little") + return bytes(h) + + +def test_is_linux_i386_elf_accepts_sysv(tmp_path: Path) -> None: + p = _write(tmp_path / "x", _elf_header()) + assert afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_accepts_linux_osabi(tmp_path: Path) -> None: + p = _write(tmp_path / "x", _elf_header(ei_osabi=3)) + assert afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_rejects_freebsd(tmp_path: Path) -> None: + """Snoopy.A in theZoo is FreeBSD/i386 — looks similar but won't + run on Metasploitable2.""" + p = _write(tmp_path / "x", _elf_header(ei_osabi=9)) + assert not afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_rejects_x86_64(tmp_path: Path) -> None: + p = _write(tmp_path / "x", _elf_header(ei_class=2, e_machine=0x3E)) + assert not afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_rejects_arm(tmp_path: Path) -> None: + """Mirai.B in theZoo is ARM — won't run on x86 Metasploitable2.""" + p = _write(tmp_path / "x", _elf_header(e_machine=0x28)) + assert not afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_rejects_pe32(tmp_path: Path) -> None: + """Windows PE32 starts with MZ, not \\x7fELF.""" + p = _write(tmp_path / "x", b"MZ" + b"\x00" * 18) + assert not afs._is_linux_i386_elf(p) + + +def test_is_linux_i386_elf_rejects_text(tmp_path: Path) -> None: + p = _write(tmp_path / "x", b"hello\n") + assert not afs._is_linux_i386_elf(p) + + +# --------------------------------------------------------------------------- +# Family-token expansion (the bug that broke v1: "Cryptolocker-class" +# wouldn't match "CryptoLocker_22Jan2014" because the suffix "-class" +# isn't in the dir name) +# --------------------------------------------------------------------------- + + +def test_family_tokens_strips_suffix() -> None: + assert afs._family_tokens("Cryptolocker-class") == [ + "cryptolocker-class", "cryptolocker" + ] + + +def test_family_tokens_dot_namespaces_kept() -> None: + """Linux.Mirai stays as `linux.mirai` so it lands on the right dir + rather than matching every Linux.* entry by the head token.""" + out = afs._family_tokens("Linux.Mirai") + assert out[0] == "linux.mirai" + # Head token "linux" is appended as a fallback. + assert "linux" in out + + +# --------------------------------------------------------------------------- +# Extraction picker prefers Linux i386 ELF +# --------------------------------------------------------------------------- + + +def test_extract_largest_binary_prefers_linux_i386(tmp_path: Path) -> None: + """Mimics theZoo's Linux.Encoder.1 layout: multiple binaries in the + same zip, only one of which is Linux i386. The picker must return + that one even though it isn't the largest.""" + import zipfile + zip_path = tmp_path / "test.zip" + big_x86_64 = _elf_header(ei_class=2, e_machine=0x3E) + b"\x00" * 5000 + small_i386 = _elf_header() + b"\x00" * 100 + freebsd_i386 = _elf_header(ei_osabi=9) + b"\x00" * 8000 + with zipfile.ZipFile(zip_path, "w") as z: + z.writestr("big-x86-64", big_x86_64) + z.writestr("small-i386", small_i386) + z.writestr("freebsd-i386", freebsd_i386) + work = tmp_path / "extract" + chosen = afs._extract_largest_binary(zip_path, work) + assert chosen is not None + assert chosen.name == "small-i386", ( + f"picker should prefer Linux i386 over larger non-Linux ELFs, " + f"got {chosen.name}" + ) + + +def test_extract_largest_binary_falls_back_to_other_elf(tmp_path: Path) -> None: + """Mimics theZoo's Linux.Mirai.B (ARM ELF only). Picker should + still return something even though it won't run on Metasploitable2.""" + import zipfile + zip_path = tmp_path / "test.zip" + arm_elf = _elf_header(e_machine=0x28) + b"\x00" * 200 + text = b"placeholder text\n" + with zipfile.ZipFile(zip_path, "w") as z: + z.writestr("arm-binary", arm_elf) + z.writestr("readme.txt", text) + work = tmp_path / "extract" + chosen = afs._extract_largest_binary(zip_path, work) + assert chosen is not None + assert chosen.name == "arm-binary" + + +# --------------------------------------------------------------------------- +# Manifest rewrite preserves stat +# --------------------------------------------------------------------------- + + +def test_update_manifest_entry_preserves_mode(tmp_path: Path) -> None: + import stat as _st + m = tmp_path / "manifest.toml" + m.write_text( + '[[sample]]\n' + 'name = "x"\n' + 'family = "F"\n' + 'category = "rat"\n' + 'profile = "shell-resident"\n' + 'description = "d"\n' + ) + m.chmod(0o644) + before = _st.S_IMODE(m.stat().st_mode) + afs.update_manifest_entry(m, "x", source="theZoo", + sha256="a" * 64, + url="https://example.invalid/") + after = _st.S_IMODE(m.stat().st_mode) + assert before == after + text = m.read_text() + assert 'sha256 = "' + ("a" * 64) + '"' in text + assert 'source = "theZoo"' in text + + +def test_update_manifest_entry_skips_when_sha256_already_set(tmp_path: Path) -> None: + """Re-running auto_fetch on an already-staged sample is a no-op.""" + m = tmp_path / "manifest.toml" + m.write_text( + '[[sample]]\n' + 'name = "x"\n' + 'family = "F"\n' + 'category = "rat"\n' + 'profile = "shell-resident"\n' + 'sha256 = "' + ("a" * 64) + '"\n' + 'description = "d"\n' + ) + before = m.read_text() + afs.update_manifest_entry(m, "x", source="theZoo", + sha256="b" * 64, + url="https://example.invalid/") + after = m.read_text() + assert before == after, "should not overwrite an existing sha256" diff --git a/tools/auto_fetch_samples.py b/tools/auto_fetch_samples.py index 6247f66..1e3f762 100644 --- a/tools/auto_fetch_samples.py +++ b/tools/auto_fetch_samples.py @@ -71,56 +71,143 @@ def _ensure_thezoo(clone_dir: Path) -> Path: return clone_dir +def _family_tokens(family: str) -> list[str]: + """Split a manifest family name into search tokens. ``Cryptolocker-class`` + → ``["cryptolocker-class", "cryptolocker"]`` so the search hits theZoo + dirs like ``CryptoLocker_22Jan2014`` (which contain "cryptolocker" + but not "-class"). ``Linux.Mirai.B`` → ``["linux.mirai.b", "linux"]`` + — the literal-first-token will match the exact dir, the second is a + fallback. Tokens are tried in order; the first matching dir wins.""" + f = family.lower().strip() + out: list[str] = [f] + # Strip any "-suffix" / "_suffix" / ".suffix" the manifest uses for + # clarity (e.g. "Cryptolocker-class" → also try "cryptolocker"; + # "Linux.Mirai" → also try "linux.mirai" then fall back to + # "linux"). All tokens are tried in order; first match wins. + head_dash = f.split("-")[0] + if head_dash != f: + out.append(head_dash) + head_dot = f.split(".")[0] + if head_dot != f and head_dot not in out: + out.append(head_dot) + head_underscore = f.split("_")[0] + if head_underscore != f and head_underscore not in out: + out.append(head_underscore) + return out + + def _find_family_dir(thezoo: Path, family: str) -> Path | None: - """Locate a Binaries subdir whose name contains ``family`` - (case-insensitive). theZoo's layout is - ``malware/Binaries//``.""" + """Locate a Binaries subdir matching ``family`` (case-insensitive + substring). theZoo's layout is ``malware/Binaries//``. + + Two-pass match: first try the full lower-cased family, then strip + suffixes like ``-class``/``-mimic`` and try the head token. We pick + the prefix-match if there is one (so ``Mirai`` lands on + ``Linux.Mirai.B`` rather than ``MirageFox``), otherwise the first + substring match in alphabetical order.""" binaries = thezoo / "malware" / "Binaries" if not binaries.is_dir(): log.warning("theZoo layout missing %s — pull broke?", binaries) return None - needle = family.lower() - matches: list[Path] = [] - for child in sorted(binaries.iterdir()): - if not child.is_dir(): + children = [c for c in sorted(binaries.iterdir()) if c.is_dir()] + for needle in _family_tokens(family): + matches = [c for c in children if needle in c.name.lower()] + if not matches: continue - if needle in child.name.lower(): - matches.append(child) - if not matches: - return None - # Prefer exact-match prefix (e.g. "Mirai" before "MirageFox"). - for m in matches: - if m.name.lower().startswith(needle): - return m - return matches[0] + # Prefer prefix match. + for m in matches: + if m.name.lower().startswith(needle): + return m + return matches[0] + return None + + +def _is_linux_i386_elf(path: Path) -> bool: + """Check magic bytes for ELF 32-bit Intel 80386 (Metasploitable2's + native arch). Pure stdlib so we don't depend on `file`.""" + try: + with path.open("rb") as f: + head = f.read(20) + except OSError: + return False + if len(head) < 20 or head[:4] != b"\x7fELF": + return False + # ei_class = 1 (32-bit), ei_data = 1 (little-endian), e_machine + # at offset 18 = 0x03 for i386. ei_osabi at offset 7 == 0 (SYSV) + # OR 3 (Linux). FreeBSD is 9 — exclude. + if head[4] != 1 or head[5] != 1: + return False + if head[7] not in (0, 3): # SYSV or Linux + return False + e_machine = int.from_bytes(head[18:20], "little") + return e_machine == 0x03 # EM_386 def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None: - """Extract the password-protected zip and return the path to the - largest payload that isn't an obvious text artifact (md5/sha256 - sidecars, READMEs, license files).""" + """Extract the password-protected zip and return the best payload: + + 1. The largest **Linux i386 ELF** in the archive (prefers binaries + that will actually execute inside Metasploitable2). + 2. Any other ELF (some samples are ARM/x86-64; the chunked + uploader will land them but execution is best-effort). + 3. The largest non-text file (last-resort fallback for + Windows-PE-only archives, in case Wine is on the target). + + Filters out obvious sidecars (md5/sha256/passwords/readmes).""" work_dir.mkdir(parents=True, exist_ok=True) - candidates: list[tuple[int, Path]] = [] with zipfile.ZipFile(zip_path) as z: try: z.extractall(path=work_dir, pwd=THEZOO_PASSWORD) except RuntimeError as e: log.warning("extract %s failed: %s", zip_path.name, e) return None + + payloads: list[Path] = [] for f in work_dir.rglob("*"): if not f.is_file(): continue name = f.name.lower() if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1", - ".txt", ".md", ".pass")): + ".txt", ".md", ".pass", ".c", + ".bat", ".sln", ".vcproj")): continue if name in {"readme", "license", "metadata.txt"}: continue - candidates.append((f.stat().st_size, f)) - if not candidates: + payloads.append(f) + if not payloads: return None - candidates.sort(reverse=True) - return candidates[0][1] + + # Tier 1: Linux i386 ELF, largest first. + linux_i386 = sorted( + (p for p in payloads if _is_linux_i386_elf(p)), + key=lambda p: p.stat().st_size, reverse=True, + ) + if linux_i386: + return linux_i386[0] + + # Tier 2: any ELF (best-effort — chunked upload still works, + # the binary may fail to execute inside the target VM but the + # episode records the attempt). + def _is_elf(p: Path) -> bool: + try: + with p.open("rb") as f: + return f.read(4) == b"\x7fELF" + except OSError: + return False + other_elf = sorted( + (p for p in payloads if _is_elf(p)), + key=lambda p: p.stat().st_size, reverse=True, + ) + if other_elf: + log.warning("%s: no Linux i386 ELF found; falling back to %s " + "(may not execute on Metasploitable2)", + zip_path.name, other_elf[0].name) + return other_elf[0] + + # Tier 3: largest non-text payload (Windows PE etc.). + log.warning("%s: no ELF found; falling back to largest non-text payload", + zip_path.name) + return max(payloads, key=lambda p: p.stat().st_size) def _sha256_of(path: Path) -> str: