Wraps the three remaining 🚧 items from the README so every collector the threat-model promises is actually live, and the Tier-4 path (real-malware fetch + upload + exec) works end-to-end as soon as a sha256 lands in samples/store/. Closes spectral/CIS490#4, #5, #6. == #6 — Bridge pcap wiring == EpisodeConfig grows three optional fields: bridge_iface: str | None # e.g. "br-malware" bridge_ip: str = "10.200.0.1" pcap_snaplen: int = 256 When bridge_iface is set, EpisodeRunner spawns tcpdump for the duration of the schedule (network.pcap), stops it cleanly on episode end, and runs collectors.pcap.bucketize() to produce netflow.jsonl per the 100-ms schema in docs/data-model.md. EpisodeResult + meta.result gain rows_netflow + pcap_bytes counters. vm/launch_demo.sh + launch_target.sh now switch between SLIRP usermode and tap+bridge based on $BRIDGE — operator pre-creates the tap as a bridge member, no sudo from the launcher. run_real_vm_demo.py picks BRIDGE up from env so the fleet runner can opt entire waves into pcap mode by exporting BRIDGE before invocation. == #5 — Source 3 perf collector == collectors/perf_qemu.py shells out to ``perf stat -p <pid> -I 100 -j`` and parses the per-event JSON stream. Aggregates one row per interval across the canonical event set (cycles/instructions/cache-{refs,misses}/ branches/branch-misses/page-faults/context-switches), computes IPC + cache-miss rate. Tolerates missing events (``<not counted>`` / ``<not supported>``) without dropping the row, and skips cleanly when ``perf`` isn't on PATH or the process can't be attached. EpisodeConfig.enable_perf=True opts into the collector — off by default because perf needs CAP_SYS_ADMIN or perf_event_paranoid <= 1. When enabled, runs as a parallel thread alongside the other collectors; EpisodeResult.rows_perf records the count. == #4 — Tier 4 (real-malware fetch + upload + exec) == tools/fetch_sample.py: pulls a sample by sha256 from MalwareBazaar (API key from env or samples/.bazaar.token), unzips with the standard "infected" password, verifies the resulting binary's sha256, lands at samples/store/<sha256>. Idempotent — already-staged correct binaries return immediately. samples/manifest.py: Sample.binary_path(store_root) resolves to the staged binary path, or None for mimics / not-yet-fetched real samples. exploits/workloads.py: real_binary_workload(bytes, sample) builds a Workload that base64-uploads the binary into the shell session via a heredoc, decodes + chmods + execs it in the background, captures the PID for clean stop on dormant. Per-profile pid/bin paths so concurrent samples in the same guest don't collide. exploits/driver.py: dispatch order is now: 1) sample.kind == "real" + binary staged at sample_store_root → real_binary_workload (Tier 4) 2) profile mimic from workloads.workload_for() (Tier 3 v2) 3) None → driver v1 fallback yes-loop DriverConfig.sample_store_root is the new field; run_tier3_demo.py wires it to repo_root/samples/store. driver_setup event records sample_sha256 so trainers can join Tier-4 episodes against the manifest by hash. samples/store/.gitkeep added (binaries themselves are gitignored). Tests: 102 pass (was 86). New suites: tests/test_perf_qemu.py — parser + builder + perf-missing fallback tests/test_tier4.py — real_binary_workload base64 round-trip, stop-cmd kills pidfile, per-profile path isolation, driver dispatch chooses real vs mimic correctly, fetcher input validation and cached-fast-path Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
"""Sample manifest loader + per-(host, slot) deterministic selection.
|
|
|
|
The manifest at ``samples/manifest.toml`` defines the catalog of
|
|
samples (real or mimic) the fleet draws from. Selection is
|
|
**deterministic** given ``(host_id, slot, episode_index)`` so two lab
|
|
hosts on the same fleet pick *different* samples for the same slot
|
|
index, and the same host repeats only after exhausting the catalog.
|
|
|
|
This gives us "all hosts on the network generating novel data" without
|
|
needing a coordinator: every host's `host_id` seeds its own
|
|
sample-rotation order, and the orderings spread across the catalog.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import tomllib
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
|
|
_VALID_CATEGORIES = {
|
|
"cryptominer", "botnet", "ransomware", "banking-trojan",
|
|
"fileless", "rat", "worm", "loader", "wiper", "other",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Sample:
|
|
name: str
|
|
family: str
|
|
category: str
|
|
profile: str
|
|
description: str = ""
|
|
source: str | None = None
|
|
sha256: str | None = None
|
|
url: str | None = None
|
|
|
|
@property
|
|
def kind(self) -> str:
|
|
"""``"real"`` if a sha256-pinned binary is expected, else ``"mimic"``.
|
|
Trainers filter on this so the realistic-model pipeline only
|
|
consumes real-malware episodes."""
|
|
return "real" if self.sha256 else "mimic"
|
|
|
|
def binary_path(self, store_root: Path) -> Path | None:
|
|
"""Resolved path of the staged binary, or None if this sample
|
|
has no sha256 (mimic) or the binary hasn't been fetched yet."""
|
|
if not self.sha256:
|
|
return None
|
|
p = Path(store_root) / self.sha256
|
|
return p if p.exists() else None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SampleManifest:
|
|
samples: list[Sample] = field(default_factory=list)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.samples)
|
|
|
|
def select(self, *, host_id: str, slot: int, episode_index: int = 0) -> Sample:
|
|
"""Deterministic selection. The host_id mixes into the seed so
|
|
different hosts visit the catalog in different orders; slot +
|
|
episode_index tick within a host. Same inputs always give the
|
|
same sample — replay-friendly for debugging."""
|
|
if not self.samples:
|
|
raise ValueError("manifest is empty")
|
|
# SHA-256 of the seed gives a uniformly distributed integer.
|
|
seed = f"{host_id}|{slot}|{episode_index}".encode()
|
|
h = hashlib.sha256(seed).digest()
|
|
idx = int.from_bytes(h[:8], "big") % len(self.samples)
|
|
return self.samples[idx]
|
|
|
|
@classmethod
|
|
def load(cls, path: str | Path) -> "SampleManifest":
|
|
with open(path, "rb") as f:
|
|
data = tomllib.load(f)
|
|
raw = data.get("sample") or []
|
|
if not isinstance(raw, list):
|
|
raise ValueError(f"{path}: 'sample' must be an array of tables")
|
|
|
|
samples: list[Sample] = []
|
|
for i, entry in enumerate(raw):
|
|
if not isinstance(entry, dict):
|
|
raise ValueError(f"{path}: sample[{i}] is not a table")
|
|
for key in ("name", "family", "category", "profile"):
|
|
if not isinstance(entry.get(key), str) or not entry[key]:
|
|
raise ValueError(f"{path}: sample[{i}] missing or empty '{key}'")
|
|
if entry["category"] not in _VALID_CATEGORIES:
|
|
raise ValueError(
|
|
f"{path}: sample[{i}] category {entry['category']!r} "
|
|
f"not in {sorted(_VALID_CATEGORIES)}"
|
|
)
|
|
samples.append(Sample(
|
|
name=entry["name"],
|
|
family=entry["family"],
|
|
category=entry["category"],
|
|
profile=entry["profile"],
|
|
description=entry.get("description", ""),
|
|
source=entry.get("source"),
|
|
sha256=entry.get("sha256"),
|
|
url=entry.get("url"),
|
|
))
|
|
|
|
# Reject duplicate names — trainers join on this.
|
|
seen: set[str] = set()
|
|
for s in samples:
|
|
if s.name in seen:
|
|
raise ValueError(f"{path}: duplicate sample name {s.name!r}")
|
|
seen.add(s.name)
|
|
|
|
return cls(samples=samples)
|