auto_fetch_samples: pick Linux i386 ELF; manifest matches theZoo

User caught it: I shipped the theZoo path without running it
end-to-end. A real fetch on the Pi exposed two bugs:

1. Family-name matcher was substring-strict. "Cryptolocker-class"
   wouldn't match the dir "CryptoLocker_22Jan2014" because "-class"
   isn't in the dir name. Now expands to a sequence of tokens
   (full, head-of-dash, head-of-dot, head-of-underscore) and tries
   each. First match wins.

2. Extraction picker was "largest non-text" — a bad heuristic for
   theZoo, where each Linux.* zip often contains MULTIPLE binaries
   for different platforms (Linux i386, x86-64, ARM, FreeBSD, sometimes
   even Windows PE). The largest is rarely the i386 Linux ELF that
   would actually run on Metasploitable2. Now sniffs ELF magic bytes
   in stdlib and tiers:
     1. Linux i386 ELF (largest first)
     2. any other ELF (best-effort, may not execute)
     3. largest non-text (Wine fallback)

Verified end-to-end on the Pi against a real theZoo clone (~500 MB,
263 family dirs, 2026-05-01 fresh pull):

  linux-encoder-ransomware  → ELF 32-bit Intel i386 SYSV (278 KB)
  linux-wirenet-rat         → ELF 32-bit Intel i386 SYSV (64 KB)
  linux-rex-ransomware      → ELF 32-bit Intel i386 SYSV Go (7.6 MB)
  linux-neurevt-bot         → ELF 32-bit Intel i386 SYSV (3.0 MB)
  linux-earthkrahang-apt    → ELF 32-bit Intel i386 GNU/Linux (5.8 MB)

5/5 picks are runnable Linux i386 ELFs. Manifest rewrites in place
add source/sha256/url; meta.sample.kind goes to "real" automatically.

Manifest rewritten:
  - Old families (XMRig, Mirai, Cryptolocker-class, Dridex, Kovter,
    Reverse-Shell) → mostly absent from theZoo's Linux catalog or
    matched the wrong arch.
  - New families chosen against a verified theZoo presence list:
    Linux.Encoder, Linux.Wirenet, Ransomware.Rex, Neurevt,
    EarthKrahang.
  - XMRig + Kovter remain as mimic-only fallbacks (theZoo lacks a
    runnable Linux i386 binary for these; orchestrator falls back
    to the mimic profile).

Tests added (tests/test_auto_fetch_samples.py): 13 cases covering
ELF magic detection (i386 accepted, FreeBSD/x86-64/ARM/PE32/text
all rejected), family-token expansion (the "-class" suffix bug),
extraction picker (prefers Linux i386 over larger non-Linux ELFs),
manifest in-place rewrite preserves mode + skips entries that
already have sha256.

What's still NOT verified end-to-end (requires a lab host with
KVM x86):
  - Metasploitable2 boot under QEMU
  - vsftpd_234_backdoor exploit fire via msfrpcd
  - chunked binary upload through a real shell session
  - real binary executing inside a Metasploitable2 guest

The Pi is ARM64 — can't run Metasploitable2. install-tier-3-4.sh's
verify step (run_tier3_demo.py) covers all four on a real lab host;
deploy verifies on first run there.

171/171 tests pass.
This commit is contained in:
max 2026-05-01 03:28:26 -05:00
parent cc0c96953e
commit b809e1e26e
3 changed files with 367 additions and 73 deletions

View file

@ -4,67 +4,74 @@
# - identity (name, family, category) for labeling # - identity (name, family, category) for labeling
# - acquisition (source, sha256, url) for reproducibility # - acquisition (source, sha256, url) for reproducibility
# - behaviour (profile) so the synthetic load mimic can run a # - behaviour (profile) so the synthetic load mimic can run a
# reasonable proxy until the real sample lands at vm/images/ # reasonable proxy until the real sample lands at samples/store/.
# #
# When the real malware binary is present at samples/store/<sha256>, # When the real malware binary is present at samples/store/<sha256>,
# the orchestrator runs THAT inside the guest. When it's absent, the # the orchestrator runs THAT inside the guest. When it's absent, the
# orchestrator falls back to running tools/load_mimic.py with the # orchestrator falls back to the mimic workload with the matching
# matching profile so the fleet still produces *labeled, varied* data # profile so the fleet still produces *labeled, varied* data while
# while we collect the real samples. Either way, meta.json records # we collect the real samples. Either way, meta.json records which
# which path the episode took, so trainers can filter on # path the episode took, so trainers can filter on
# meta.sample.kind ∈ {real, mimic}. # meta.sample.kind ∈ {real, mimic}.
#
# Families below are CHOSEN AND TESTED to match theZoo entries that
# contain a Linux 32-bit Intel 80386 ELF binary — i.e. binaries that
# will execute natively inside our Metasploitable2 (Ubuntu 8.04 i386)
# target VM. Verified against a fresh theZoo clone on 2026-05-01;
# tools/auto_fetch_samples.py prefers the Linux-i386 ELF in each
# multi-binary zip via `_is_linux_i386_elf` magic-byte sniffing.
[[sample]]
name = "linux-encoder-ransomware"
family = "Linux.Encoder"
category = "ransomware"
profile = "io-walk"
description = "Linux.Encoder.1 (Linux i386 ELF). The first known Linux ransomware. Heavy disk write + fs walk producing a per-file overwrite envelope."
[[sample]]
name = "linux-wirenet-rat"
family = "Linux.Wirenet"
category = "rat"
profile = "shell-resident"
description = "Linux.Wirenet (Linux i386 ELF). RAT with a long-lived TCP socket pinned to a fixed peer; occasional command bursts."
[[sample]]
name = "linux-rex-ransomware"
family = "Ransomware.Rex"
category = "ransomware"
profile = "io-walk"
description = "Ransomware.Rex (Linux i386 ELF, written in Go). File-walk encryption envelope with periodic CPU spikes during AES."
[[sample]]
name = "linux-neurevt-bot"
family = "Neurevt"
category = "botnet"
profile = "scan-and-dial"
description = "Neurevt 1.7 (Linux i386 ELF). Botnet panel binary; SYN scans + periodic dial-home pattern."
[[sample]]
name = "linux-earthkrahang-apt"
family = "EarthKrahang"
category = "rat"
profile = "bursty-c2"
description = "EarthKrahang 2024 (Linux i386 ELF). APT backdoor; long idle + periodic small TCP egress bursts."
# Mimic-only fallback families. theZoo doesn't have a clean Linux i386
# binary for these; auto_fetch_samples.py logs a warning and the
# orchestrator stays on the mimic workload until a real binary is
# staged manually at samples/store/<sha256>. Kept here so the trainer
# can still collect cpu-saturate and low-and-slow envelopes (those
# profiles' theZoo coverage is sparse).
[[sample]] [[sample]]
name = "xmrig-cryptominer" name = "xmrig-cryptominer"
family = "XMRig" family = "XMRig"
category = "cryptominer" category = "cryptominer"
profile = "cpu-saturate" profile = "cpu-saturate"
description = "Sustained 1-vCPU saturation, very low IO/net. Pure compute." description = "Mimic only on Metasploitable2 (no Linux-i386 XMRig in theZoo)."
# To promote this entry to Tier-4 (real binary):
# 1. Pick a sha256 from https://bazaar.abuse.ch/ for this family.
# 2. Add `source`, `sha256`, `url` fields below.
# 3. On the lab host (one-time per host):
# export MALWAREBAZAAR_API_KEY=<key>
# sudo -u cis490 /opt/cis490/.venv/bin/python \
# /opt/cis490/tools/fetch_sample.py <sha256>
# The sha256 is verified on download; the binary lands at
# /opt/cis490/samples/store/<sha256>.
# 4. Restart cis490-orchestrator. Episodes that select this sample
# now run the real binary via the chunked upload path. If the
# binary isn't on disk, the orchestrator falls back to the mimic
# profile above — both kinds coexist via meta.sample.kind.
[[sample]]
name = "mirai-class-bot"
family = "Mirai"
category = "botnet"
profile = "scan-and-dial"
description = "SYN scans across the bridge IP space + periodic dial-home. High net, low CPU."
[[sample]]
name = "ransomware-mimic"
family = "Cryptolocker-class"
category = "ransomware"
profile = "io-walk"
description = "Heavy disk write + filesystem walk producing a per-file overwrite envelope."
[[sample]]
name = "dridex-class-trojan"
family = "Dridex"
category = "banking-trojan"
profile = "bursty-c2"
description = "Long idle, periodic short bursts of TCP egress to a fixed peer (C2 beacon shape)."
[[sample]] [[sample]]
name = "kovter-class-stealth" name = "kovter-class-stealth"
family = "Kovter" family = "Kovter"
category = "fileless" category = "fileless"
profile = "low-and-slow" profile = "low-and-slow"
description = "Low CPU, periodic memory churn, no persistent on-disk artifacts. Hardest to label from /proc alone." description = "Mimic only — Kovter is Windows-native; theZoo's binary won't run on Metasploitable2 i386."
[[sample]]
name = "reverse-shell-resident"
family = "Reverse-Shell"
category = "rat"
profile = "shell-resident"
description = "Single TCP socket pinned to an attacker IP, occasional command bursts."

View file

@ -0,0 +1,200 @@
"""Tests for tools/auto_fetch_samples.py.
Exercises the parts that can be tested without a real theZoo clone:
- ELF magic-byte sniffing for Linux i386 detection
- family-name directory matching (substring + token fallback)
- manifest in-place rewrite (atomic, stat-preserving)
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"auto_fetch_samples", REPO_ROOT / "tools" / "auto_fetch_samples.py"
)
afs = importlib.util.module_from_spec(spec)
sys.modules["auto_fetch_samples"] = afs
spec.loader.exec_module(afs)
# ---------------------------------------------------------------------------
# ELF magic detection
# ---------------------------------------------------------------------------
def _write(p: Path, data: bytes) -> Path:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(data)
return p
def _elf_header(*, ei_class: int = 1, ei_data: int = 1, ei_osabi: int = 0,
e_machine: int = 0x03) -> bytes:
"""Synthesise a minimal ELF header. Default = Linux i386."""
h = bytearray(20)
h[:4] = b"\x7fELF"
h[4] = ei_class # 1=32, 2=64
h[5] = ei_data # 1=little, 2=big
h[6] = 1 # ei_version
h[7] = ei_osabi # 0=SYSV, 3=Linux, 9=FreeBSD
h[18:20] = e_machine.to_bytes(2, "little")
return bytes(h)
def test_is_linux_i386_elf_accepts_sysv(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header())
assert afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_accepts_linux_osabi(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header(ei_osabi=3))
assert afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_freebsd(tmp_path: Path) -> None:
"""Snoopy.A in theZoo is FreeBSD/i386 — looks similar but won't
run on Metasploitable2."""
p = _write(tmp_path / "x", _elf_header(ei_osabi=9))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_x86_64(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header(ei_class=2, e_machine=0x3E))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_arm(tmp_path: Path) -> None:
"""Mirai.B in theZoo is ARM — won't run on x86 Metasploitable2."""
p = _write(tmp_path / "x", _elf_header(e_machine=0x28))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_pe32(tmp_path: Path) -> None:
"""Windows PE32 starts with MZ, not \\x7fELF."""
p = _write(tmp_path / "x", b"MZ" + b"\x00" * 18)
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_text(tmp_path: Path) -> None:
p = _write(tmp_path / "x", b"hello\n")
assert not afs._is_linux_i386_elf(p)
# ---------------------------------------------------------------------------
# Family-token expansion (the bug that broke v1: "Cryptolocker-class"
# wouldn't match "CryptoLocker_22Jan2014" because the suffix "-class"
# isn't in the dir name)
# ---------------------------------------------------------------------------
def test_family_tokens_strips_suffix() -> None:
assert afs._family_tokens("Cryptolocker-class") == [
"cryptolocker-class", "cryptolocker"
]
def test_family_tokens_dot_namespaces_kept() -> None:
"""Linux.Mirai stays as `linux.mirai` so it lands on the right dir
rather than matching every Linux.* entry by the head token."""
out = afs._family_tokens("Linux.Mirai")
assert out[0] == "linux.mirai"
# Head token "linux" is appended as a fallback.
assert "linux" in out
# ---------------------------------------------------------------------------
# Extraction picker prefers Linux i386 ELF
# ---------------------------------------------------------------------------
def test_extract_largest_binary_prefers_linux_i386(tmp_path: Path) -> None:
"""Mimics theZoo's Linux.Encoder.1 layout: multiple binaries in the
same zip, only one of which is Linux i386. The picker must return
that one even though it isn't the largest."""
import zipfile
zip_path = tmp_path / "test.zip"
big_x86_64 = _elf_header(ei_class=2, e_machine=0x3E) + b"\x00" * 5000
small_i386 = _elf_header() + b"\x00" * 100
freebsd_i386 = _elf_header(ei_osabi=9) + b"\x00" * 8000
with zipfile.ZipFile(zip_path, "w") as z:
z.writestr("big-x86-64", big_x86_64)
z.writestr("small-i386", small_i386)
z.writestr("freebsd-i386", freebsd_i386)
work = tmp_path / "extract"
chosen = afs._extract_largest_binary(zip_path, work)
assert chosen is not None
assert chosen.name == "small-i386", (
f"picker should prefer Linux i386 over larger non-Linux ELFs, "
f"got {chosen.name}"
)
def test_extract_largest_binary_falls_back_to_other_elf(tmp_path: Path) -> None:
"""Mimics theZoo's Linux.Mirai.B (ARM ELF only). Picker should
still return something even though it won't run on Metasploitable2."""
import zipfile
zip_path = tmp_path / "test.zip"
arm_elf = _elf_header(e_machine=0x28) + b"\x00" * 200
text = b"placeholder text\n"
with zipfile.ZipFile(zip_path, "w") as z:
z.writestr("arm-binary", arm_elf)
z.writestr("readme.txt", text)
work = tmp_path / "extract"
chosen = afs._extract_largest_binary(zip_path, work)
assert chosen is not None
assert chosen.name == "arm-binary"
# ---------------------------------------------------------------------------
# Manifest rewrite preserves stat
# ---------------------------------------------------------------------------
def test_update_manifest_entry_preserves_mode(tmp_path: Path) -> None:
import stat as _st
m = tmp_path / "manifest.toml"
m.write_text(
'[[sample]]\n'
'name = "x"\n'
'family = "F"\n'
'category = "rat"\n'
'profile = "shell-resident"\n'
'description = "d"\n'
)
m.chmod(0o644)
before = _st.S_IMODE(m.stat().st_mode)
afs.update_manifest_entry(m, "x", source="theZoo",
sha256="a" * 64,
url="https://example.invalid/")
after = _st.S_IMODE(m.stat().st_mode)
assert before == after
text = m.read_text()
assert 'sha256 = "' + ("a" * 64) + '"' in text
assert 'source = "theZoo"' in text
def test_update_manifest_entry_skips_when_sha256_already_set(tmp_path: Path) -> None:
"""Re-running auto_fetch on an already-staged sample is a no-op."""
m = tmp_path / "manifest.toml"
m.write_text(
'[[sample]]\n'
'name = "x"\n'
'family = "F"\n'
'category = "rat"\n'
'profile = "shell-resident"\n'
'sha256 = "' + ("a" * 64) + '"\n'
'description = "d"\n'
)
before = m.read_text()
afs.update_manifest_entry(m, "x", source="theZoo",
sha256="b" * 64,
url="https://example.invalid/")
after = m.read_text()
assert before == after, "should not overwrite an existing sha256"

View file

@ -71,56 +71,143 @@ def _ensure_thezoo(clone_dir: Path) -> Path:
return clone_dir return clone_dir
def _family_tokens(family: str) -> list[str]:
"""Split a manifest family name into search tokens. ``Cryptolocker-class``
``["cryptolocker-class", "cryptolocker"]`` so the search hits theZoo
dirs like ``CryptoLocker_22Jan2014`` (which contain "cryptolocker"
but not "-class"). ``Linux.Mirai.B`` ``["linux.mirai.b", "linux"]``
the literal-first-token will match the exact dir, the second is a
fallback. Tokens are tried in order; the first matching dir wins."""
f = family.lower().strip()
out: list[str] = [f]
# Strip any "-suffix" / "_suffix" / ".suffix" the manifest uses for
# clarity (e.g. "Cryptolocker-class" → also try "cryptolocker";
# "Linux.Mirai" → also try "linux.mirai" then fall back to
# "linux"). All tokens are tried in order; first match wins.
head_dash = f.split("-")[0]
if head_dash != f:
out.append(head_dash)
head_dot = f.split(".")[0]
if head_dot != f and head_dot not in out:
out.append(head_dot)
head_underscore = f.split("_")[0]
if head_underscore != f and head_underscore not in out:
out.append(head_underscore)
return out
def _find_family_dir(thezoo: Path, family: str) -> Path | None: def _find_family_dir(thezoo: Path, family: str) -> Path | None:
"""Locate a Binaries subdir whose name contains ``family`` """Locate a Binaries subdir matching ``family`` (case-insensitive
(case-insensitive). theZoo's layout is substring). theZoo's layout is ``malware/Binaries/<Family-Name>/``.
``malware/Binaries/<Family-Specific-Name>/``."""
Two-pass match: first try the full lower-cased family, then strip
suffixes like ``-class``/``-mimic`` and try the head token. We pick
the prefix-match if there is one (so ``Mirai`` lands on
``Linux.Mirai.B`` rather than ``MirageFox``), otherwise the first
substring match in alphabetical order."""
binaries = thezoo / "malware" / "Binaries" binaries = thezoo / "malware" / "Binaries"
if not binaries.is_dir(): if not binaries.is_dir():
log.warning("theZoo layout missing %s — pull broke?", binaries) log.warning("theZoo layout missing %s — pull broke?", binaries)
return None return None
needle = family.lower() children = [c for c in sorted(binaries.iterdir()) if c.is_dir()]
matches: list[Path] = [] for needle in _family_tokens(family):
for child in sorted(binaries.iterdir()): matches = [c for c in children if needle in c.name.lower()]
if not child.is_dir(): if not matches:
continue continue
if needle in child.name.lower(): # Prefer prefix match.
matches.append(child) for m in matches:
if not matches: if m.name.lower().startswith(needle):
return None return m
# Prefer exact-match prefix (e.g. "Mirai" before "MirageFox"). return matches[0]
for m in matches: return None
if m.name.lower().startswith(needle):
return m
return matches[0] def _is_linux_i386_elf(path: Path) -> bool:
"""Check magic bytes for ELF 32-bit Intel 80386 (Metasploitable2's
native arch). Pure stdlib so we don't depend on `file`."""
try:
with path.open("rb") as f:
head = f.read(20)
except OSError:
return False
if len(head) < 20 or head[:4] != b"\x7fELF":
return False
# ei_class = 1 (32-bit), ei_data = 1 (little-endian), e_machine
# at offset 18 = 0x03 for i386. ei_osabi at offset 7 == 0 (SYSV)
# OR 3 (Linux). FreeBSD is 9 — exclude.
if head[4] != 1 or head[5] != 1:
return False
if head[7] not in (0, 3): # SYSV or Linux
return False
e_machine = int.from_bytes(head[18:20], "little")
return e_machine == 0x03 # EM_386
def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None: def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None:
"""Extract the password-protected zip and return the path to the """Extract the password-protected zip and return the best payload:
largest payload that isn't an obvious text artifact (md5/sha256
sidecars, READMEs, license files).""" 1. The largest **Linux i386 ELF** in the archive (prefers binaries
that will actually execute inside Metasploitable2).
2. Any other ELF (some samples are ARM/x86-64; the chunked
uploader will land them but execution is best-effort).
3. The largest non-text file (last-resort fallback for
Windows-PE-only archives, in case Wine is on the target).
Filters out obvious sidecars (md5/sha256/passwords/readmes)."""
work_dir.mkdir(parents=True, exist_ok=True) work_dir.mkdir(parents=True, exist_ok=True)
candidates: list[tuple[int, Path]] = []
with zipfile.ZipFile(zip_path) as z: with zipfile.ZipFile(zip_path) as z:
try: try:
z.extractall(path=work_dir, pwd=THEZOO_PASSWORD) z.extractall(path=work_dir, pwd=THEZOO_PASSWORD)
except RuntimeError as e: except RuntimeError as e:
log.warning("extract %s failed: %s", zip_path.name, e) log.warning("extract %s failed: %s", zip_path.name, e)
return None return None
payloads: list[Path] = []
for f in work_dir.rglob("*"): for f in work_dir.rglob("*"):
if not f.is_file(): if not f.is_file():
continue continue
name = f.name.lower() name = f.name.lower()
if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1", if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1",
".txt", ".md", ".pass")): ".txt", ".md", ".pass", ".c",
".bat", ".sln", ".vcproj")):
continue continue
if name in {"readme", "license", "metadata.txt"}: if name in {"readme", "license", "metadata.txt"}:
continue continue
candidates.append((f.stat().st_size, f)) payloads.append(f)
if not candidates: if not payloads:
return None return None
candidates.sort(reverse=True)
return candidates[0][1] # Tier 1: Linux i386 ELF, largest first.
linux_i386 = sorted(
(p for p in payloads if _is_linux_i386_elf(p)),
key=lambda p: p.stat().st_size, reverse=True,
)
if linux_i386:
return linux_i386[0]
# Tier 2: any ELF (best-effort — chunked upload still works,
# the binary may fail to execute inside the target VM but the
# episode records the attempt).
def _is_elf(p: Path) -> bool:
try:
with p.open("rb") as f:
return f.read(4) == b"\x7fELF"
except OSError:
return False
other_elf = sorted(
(p for p in payloads if _is_elf(p)),
key=lambda p: p.stat().st_size, reverse=True,
)
if other_elf:
log.warning("%s: no Linux i386 ELF found; falling back to %s "
"(may not execute on Metasploitable2)",
zip_path.name, other_elf[0].name)
return other_elf[0]
# Tier 3: largest non-text payload (Windows PE etc.).
log.warning("%s: no ELF found; falling back to largest non-text payload",
zip_path.name)
return max(payloads, key=lambda p: p.stat().st_size)
def _sha256_of(path: Path) -> str: def _sha256_of(path: Path) -> str: