CIS490/samples/manifest.py
max bdcd2ecbef Close out the open issues: bridge pcap wiring, perf collector, Tier-4
Wraps the three remaining 🚧 items from the README so every collector
the threat-model promises is actually live, and the Tier-4 path
(real-malware fetch + upload + exec) works end-to-end as soon as a
sha256 lands in samples/store/.

Closes spectral/CIS490#4, #5, #6.

== #6 — Bridge pcap wiring ==
EpisodeConfig grows three optional fields:
  bridge_iface: str | None        # e.g. "br-malware"
  bridge_ip:    str = "10.200.0.1"
  pcap_snaplen: int = 256
When bridge_iface is set, EpisodeRunner spawns tcpdump for the duration
of the schedule (network.pcap), stops it cleanly on episode end, and
runs collectors.pcap.bucketize() to produce netflow.jsonl per the
100-ms schema in docs/data-model.md. EpisodeResult + meta.result
gain rows_netflow + pcap_bytes counters.

vm/launch_demo.sh + launch_target.sh now switch between SLIRP usermode
and tap+bridge based on $BRIDGE — operator pre-creates the tap as a
bridge member, no sudo from the launcher.

run_real_vm_demo.py picks BRIDGE up from env so the fleet runner can
opt entire waves into pcap mode by exporting BRIDGE before invocation.

== #5 — Source 3 perf collector ==
collectors/perf_qemu.py shells out to ``perf stat -p <pid> -I 100 -j``
and parses the per-event JSON stream. Aggregates one row per interval
across the canonical event set (cycles/instructions/cache-{refs,misses}/
branches/branch-misses/page-faults/context-switches), computes IPC +
cache-miss rate. Tolerates missing events (``<not counted>`` /
``<not supported>``) without dropping the row, and skips cleanly when
``perf`` isn't on PATH or the process can't be attached.

EpisodeConfig.enable_perf=True opts into the collector — off by default
because perf needs CAP_SYS_ADMIN or perf_event_paranoid <= 1. When
enabled, runs as a parallel thread alongside the other collectors;
EpisodeResult.rows_perf records the count.

== #4 — Tier 4 (real-malware fetch + upload + exec) ==
tools/fetch_sample.py: pulls a sample by sha256 from MalwareBazaar
(API key from env or samples/.bazaar.token), unzips with the standard
"infected" password, verifies the resulting binary's sha256, lands at
samples/store/<sha256>. Idempotent — already-staged correct binaries
return immediately.

samples/manifest.py: Sample.binary_path(store_root) resolves to the
staged binary path, or None for mimics / not-yet-fetched real samples.

exploits/workloads.py: real_binary_workload(bytes, sample) builds a
Workload that base64-uploads the binary into the shell session via a
heredoc, decodes + chmods + execs it in the background, captures the
PID for clean stop on dormant. Per-profile pid/bin paths so concurrent
samples in the same guest don't collide.

exploits/driver.py: dispatch order is now:
  1) sample.kind == "real" + binary staged at sample_store_root
     → real_binary_workload (Tier 4)
  2) profile mimic from workloads.workload_for() (Tier 3 v2)
  3) None → driver v1 fallback yes-loop
DriverConfig.sample_store_root is the new field; run_tier3_demo.py
wires it to repo_root/samples/store. driver_setup event records
sample_sha256 so trainers can join Tier-4 episodes against the
manifest by hash.

samples/store/.gitkeep added (binaries themselves are gitignored).

Tests: 102 pass (was 86). New suites:
  tests/test_perf_qemu.py — parser + builder + perf-missing fallback
  tests/test_tier4.py     — real_binary_workload base64 round-trip,
                            stop-cmd kills pidfile, per-profile path
                            isolation, driver dispatch chooses real vs
                            mimic correctly, fetcher input validation
                            and cached-fast-path

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:17:49 -05:00

113 lines
4.1 KiB
Python

"""Sample manifest loader + per-(host, slot) deterministic selection.
The manifest at ``samples/manifest.toml`` defines the catalog of
samples (real or mimic) the fleet draws from. Selection is
**deterministic** given ``(host_id, slot, episode_index)`` so two lab
hosts on the same fleet pick *different* samples for the same slot
index, and the same host repeats only after exhausting the catalog.
This gives us "all hosts on the network generating novel data" without
needing a coordinator: every host's `host_id` seeds its own
sample-rotation order, and the orderings spread across the catalog.
"""
from __future__ import annotations
import hashlib
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
_VALID_CATEGORIES = {
"cryptominer", "botnet", "ransomware", "banking-trojan",
"fileless", "rat", "worm", "loader", "wiper", "other",
}
@dataclass(frozen=True)
class Sample:
name: str
family: str
category: str
profile: str
description: str = ""
source: str | None = None
sha256: str | None = None
url: str | None = None
@property
def kind(self) -> str:
"""``"real"`` if a sha256-pinned binary is expected, else ``"mimic"``.
Trainers filter on this so the realistic-model pipeline only
consumes real-malware episodes."""
return "real" if self.sha256 else "mimic"
def binary_path(self, store_root: Path) -> Path | None:
"""Resolved path of the staged binary, or None if this sample
has no sha256 (mimic) or the binary hasn't been fetched yet."""
if not self.sha256:
return None
p = Path(store_root) / self.sha256
return p if p.exists() else None
@dataclass(frozen=True)
class SampleManifest:
samples: list[Sample] = field(default_factory=list)
def __len__(self) -> int:
return len(self.samples)
def select(self, *, host_id: str, slot: int, episode_index: int = 0) -> Sample:
"""Deterministic selection. The host_id mixes into the seed so
different hosts visit the catalog in different orders; slot +
episode_index tick within a host. Same inputs always give the
same sample — replay-friendly for debugging."""
if not self.samples:
raise ValueError("manifest is empty")
# SHA-256 of the seed gives a uniformly distributed integer.
seed = f"{host_id}|{slot}|{episode_index}".encode()
h = hashlib.sha256(seed).digest()
idx = int.from_bytes(h[:8], "big") % len(self.samples)
return self.samples[idx]
@classmethod
def load(cls, path: str | Path) -> "SampleManifest":
with open(path, "rb") as f:
data = tomllib.load(f)
raw = data.get("sample") or []
if not isinstance(raw, list):
raise ValueError(f"{path}: 'sample' must be an array of tables")
samples: list[Sample] = []
for i, entry in enumerate(raw):
if not isinstance(entry, dict):
raise ValueError(f"{path}: sample[{i}] is not a table")
for key in ("name", "family", "category", "profile"):
if not isinstance(entry.get(key), str) or not entry[key]:
raise ValueError(f"{path}: sample[{i}] missing or empty '{key}'")
if entry["category"] not in _VALID_CATEGORIES:
raise ValueError(
f"{path}: sample[{i}] category {entry['category']!r} "
f"not in {sorted(_VALID_CATEGORIES)}"
)
samples.append(Sample(
name=entry["name"],
family=entry["family"],
category=entry["category"],
profile=entry["profile"],
description=entry.get("description", ""),
source=entry.get("source"),
sha256=entry.get("sha256"),
url=entry.get("url"),
))
# Reject duplicate names — trainers join on this.
seen: set[str] = set()
for s in samples:
if s.name in seen:
raise ValueError(f"{path}: duplicate sample name {s.name!r}")
seen.add(s.name)
return cls(samples=samples)