"""Tests for tools/auto_fetch_samples.py. Exercises the parts that can be tested without a real theZoo clone: - ELF magic-byte sniffing for Linux i386 detection - family-name → directory matching (substring + token fallback) - manifest in-place rewrite (atomic, stat-preserving) """ from __future__ import annotations import importlib.util import sys from pathlib import Path import pytest REPO_ROOT = Path(__file__).resolve().parent.parent spec = importlib.util.spec_from_file_location( "auto_fetch_samples", REPO_ROOT / "tools" / "auto_fetch_samples.py" ) afs = importlib.util.module_from_spec(spec) sys.modules["auto_fetch_samples"] = afs spec.loader.exec_module(afs) # --------------------------------------------------------------------------- # ELF magic detection # --------------------------------------------------------------------------- def _write(p: Path, data: bytes) -> Path: p.parent.mkdir(parents=True, exist_ok=True) p.write_bytes(data) return p def _elf_header(*, ei_class: int = 1, ei_data: int = 1, ei_osabi: int = 0, e_machine: int = 0x03) -> bytes: """Synthesise a minimal ELF header. Default = Linux i386.""" h = bytearray(20) h[:4] = b"\x7fELF" h[4] = ei_class # 1=32, 2=64 h[5] = ei_data # 1=little, 2=big h[6] = 1 # ei_version h[7] = ei_osabi # 0=SYSV, 3=Linux, 9=FreeBSD h[18:20] = e_machine.to_bytes(2, "little") return bytes(h) def test_is_linux_i386_elf_accepts_sysv(tmp_path: Path) -> None: p = _write(tmp_path / "x", _elf_header()) assert afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_accepts_linux_osabi(tmp_path: Path) -> None: p = _write(tmp_path / "x", _elf_header(ei_osabi=3)) assert afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_rejects_freebsd(tmp_path: Path) -> None: """Snoopy.A in theZoo is FreeBSD/i386 — looks similar but won't run on Metasploitable2.""" p = _write(tmp_path / "x", _elf_header(ei_osabi=9)) assert not afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_rejects_x86_64(tmp_path: Path) -> None: p = _write(tmp_path / "x", _elf_header(ei_class=2, e_machine=0x3E)) assert not afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_rejects_arm(tmp_path: Path) -> None: """Mirai.B in theZoo is ARM — won't run on x86 Metasploitable2.""" p = _write(tmp_path / "x", _elf_header(e_machine=0x28)) assert not afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_rejects_pe32(tmp_path: Path) -> None: """Windows PE32 starts with MZ, not \\x7fELF.""" p = _write(tmp_path / "x", b"MZ" + b"\x00" * 18) assert not afs._is_linux_i386_elf(p) def test_is_linux_i386_elf_rejects_text(tmp_path: Path) -> None: p = _write(tmp_path / "x", b"hello\n") assert not afs._is_linux_i386_elf(p) # --------------------------------------------------------------------------- # Family-token expansion (the bug that broke v1: "Cryptolocker-class" # wouldn't match "CryptoLocker_22Jan2014" because the suffix "-class" # isn't in the dir name) # --------------------------------------------------------------------------- def test_family_tokens_strips_suffix() -> None: assert afs._family_tokens("Cryptolocker-class") == [ "cryptolocker-class", "cryptolocker" ] def test_family_tokens_dot_namespaces_kept() -> None: """Linux.Mirai stays as `linux.mirai` so it lands on the right dir rather than matching every Linux.* entry by the head token.""" out = afs._family_tokens("Linux.Mirai") assert out[0] == "linux.mirai" # Head token "linux" is appended as a fallback. assert "linux" in out # --------------------------------------------------------------------------- # Extraction picker prefers Linux i386 ELF # --------------------------------------------------------------------------- def test_extract_largest_binary_prefers_linux_i386(tmp_path: Path) -> None: """Mimics theZoo's Linux.Encoder.1 layout: multiple binaries in the same zip, only one of which is Linux i386. The picker must return that one even though it isn't the largest.""" import zipfile zip_path = tmp_path / "test.zip" big_x86_64 = _elf_header(ei_class=2, e_machine=0x3E) + b"\x00" * 5000 small_i386 = _elf_header() + b"\x00" * 100 freebsd_i386 = _elf_header(ei_osabi=9) + b"\x00" * 8000 with zipfile.ZipFile(zip_path, "w") as z: z.writestr("big-x86-64", big_x86_64) z.writestr("small-i386", small_i386) z.writestr("freebsd-i386", freebsd_i386) work = tmp_path / "extract" chosen = afs._extract_largest_binary(zip_path, work) assert chosen is not None assert chosen.name == "small-i386", ( f"picker should prefer Linux i386 over larger non-Linux ELFs, " f"got {chosen.name}" ) def test_extract_largest_binary_falls_back_to_other_elf(tmp_path: Path) -> None: """Mimics theZoo's Linux.Mirai.B (ARM ELF only). Picker should still return something even though it won't run on Metasploitable2.""" import zipfile zip_path = tmp_path / "test.zip" arm_elf = _elf_header(e_machine=0x28) + b"\x00" * 200 text = b"placeholder text\n" with zipfile.ZipFile(zip_path, "w") as z: z.writestr("arm-binary", arm_elf) z.writestr("readme.txt", text) work = tmp_path / "extract" chosen = afs._extract_largest_binary(zip_path, work) assert chosen is not None assert chosen.name == "arm-binary" # --------------------------------------------------------------------------- # Manifest rewrite preserves stat # --------------------------------------------------------------------------- def test_update_manifest_entry_preserves_mode(tmp_path: Path) -> None: import stat as _st m = tmp_path / "manifest.toml" m.write_text( '[[sample]]\n' 'name = "x"\n' 'family = "F"\n' 'category = "rat"\n' 'profile = "shell-resident"\n' 'description = "d"\n' ) m.chmod(0o644) before = _st.S_IMODE(m.stat().st_mode) afs.update_manifest_entry(m, "x", source="theZoo", sha256="a" * 64, url="https://example.invalid/") after = _st.S_IMODE(m.stat().st_mode) assert before == after text = m.read_text() assert 'sha256 = "' + ("a" * 64) + '"' in text assert 'source = "theZoo"' in text def test_update_manifest_entry_skips_when_sha256_already_set(tmp_path: Path) -> None: """Re-running auto_fetch on an already-staged sample is a no-op.""" m = tmp_path / "manifest.toml" m.write_text( '[[sample]]\n' 'name = "x"\n' 'family = "F"\n' 'category = "rat"\n' 'profile = "shell-resident"\n' 'sha256 = "' + ("a" * 64) + '"\n' 'description = "d"\n' ) before = m.read_text() afs.update_manifest_entry(m, "x", source="theZoo", sha256="b" * 64, url="https://example.invalid/") after = m.read_text() assert before == after, "should not overwrite an existing sha256"