CIS490/tests/test_prune.py
max 321ea63803 Multi-signal prune classifier: rescue valid episodes /proc misses
A laptop-class lab host (elliott-thinkpad) running 14 parallel fleet
slots can't deliver host /proc CPU% signal for the bursty profiles —
the per-VM share gets buried under contention. But the workloads ARE
running: qmp blockstats record 90+ MB written during infected_running
for io-walk episodes, netflow shows real packet bursts for
scan-and-dial, and the in-guest agent (when alive) shows load_1m
deltas the host can't see.

The classifier now cross-checks four sources before flagging an
episode:
  - /proc CPU% medians (host-side qemu)
  - netflow byte totals (bridge_pcap)
  - qmp blockstats per-phase DELTA (cumulative counters; deltas
    matter, not raw values)
  - guest-agent load_1m

An episode flags only if every available source agrees no
inter-phase signal. Missing sources are "unknown", not "flat".

Time-base bug also fixed: phase mapping now uses t_wall_ns (which
all sources stamp from CLOCK_REALTIME) rather than t_mono_ns —
netflow uses qemu boot-monotonic, /proc uses orchestrator-relative,
they don't share a number line.

Result on the live receiver:
  - 1067 active episodes, 100% kept under the new logic
  - 143 episodes rescued from a previous false-positive archive
  - Only the 9 genuinely-broken pre-Sample-propagation elliott-lab
    episodes remain archived (no-sample + no-workload-events)

Two new tests (test_flat_proc_rescued_by_netflow,
test_flat_everywhere_still_flags) pin the boundary so a future
regression surfaces immediately.

AGENTS.md gains a "classifier is multi-source" section explaining
the cross-check and the t_wall_ns invariant.
2026-04-30 19:10:01 -05:00

422 lines
16 KiB
Python

"""Tests for cis490-prune. Builds synthetic episode tarballs (each
flagged with a specific quality issue) and confirms the classifier
catches them. Then exercises the index-walk + dry-run / archive /
delete actions on a temp tree so we don't touch real data."""
from __future__ import annotations
import io
import json
import shutil
import subprocess
import tarfile
from pathlib import Path
import pytest
# Skip the whole module if zstd isn't on PATH (the prune tool shells
# out for decompression, mirroring the shipper).
zstd_available = shutil.which("zstd") is not None
pytestmark = pytest.mark.skipif(not zstd_available, reason="needs system zstd")
import sys
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "tools"))
import prune_episodes as pe # noqa: E402
# ---------------------------------------------------------------------------
# tar+zstd builder
# ---------------------------------------------------------------------------
def _make_tar_zst(out_path: Path, files: dict[str, bytes]) -> None:
"""Build a {episode_id}/<file> layout, tar it, zstd it."""
raw_tar = io.BytesIO()
with tarfile.open(fileobj=raw_tar, mode="w") as t:
for name, data in files.items():
info = tarfile.TarInfo(name=name)
info.size = len(data)
t.addfile(info, io.BytesIO(data))
out_path.parent.mkdir(parents=True, exist_ok=True)
raw_tmp = out_path.with_suffix(".tar")
raw_tmp.write_bytes(raw_tar.getvalue())
try:
subprocess.check_call(
["zstd", "-q", "-19", "--stdout", str(raw_tmp)],
stdout=out_path.open("wb"),
)
finally:
raw_tmp.unlink(missing_ok=True)
def _meta(*, sample: dict | None = None, exploit: dict | None = None) -> bytes:
return json.dumps({
"episode_id": "01TEST",
"schema_version": 1,
"sample": sample,
"exploit": exploit,
"result": {"phases_observed": ["clean", "infected_running", "dormant"]},
}, sort_keys=True).encode()
def _events(rows: list[dict]) -> bytes:
return ("\n".join(json.dumps(r, sort_keys=True) for r in rows) + "\n").encode()
def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
"""Synthesize /proc rows with either flat-CPU (no phase signal)
or sharply-spiking CPU (clear phase boundaries). The test labels
file pairs with these. Both t_mono_ns and t_wall_ns are emitted —
the classifier uses t_wall_ns for phase mapping (consistent across
sources whose t_mono_ns time-bases differ)."""
out: list[dict] = []
for i in range(n):
t = i * 100_000_000
if flat:
jiff = 100 + i * 20 # uniform increment → flat CPU%
else:
# First third clean (low), middle infected (high), last third dormant (low).
jiff = (
100 + i * 20 if i < n // 3 or i >= 2 * n // 3
else 100 + i * 1000 # huge jump for "infected"
)
out.append({
"t_mono_ns": t,
"t_wall_ns": t, # synthetic: identity to t_mono_ns for tests
"cpu_user_jiffies": jiff,
"cpu_sys_jiffies": 0,
"rss_bytes": 1024 * 1024,
})
return ("\n".join(json.dumps(r) for r in out) + "\n").encode()
def _labels(boundary_ns: list[int], names: list[str]) -> bytes:
rows = [
{"t_mono_ns": t, "t_wall_ns": t, "phase": p,
"prev": names[i - 1] if i else None}
for i, (t, p) in enumerate(zip(boundary_ns, names))
]
return ("\n".join(json.dumps(r) for r in rows) + "\n").encode()
# ---------------------------------------------------------------------------
# Per-reason classifier tests
# ---------------------------------------------------------------------------
def _make_episode(tmp_path: Path, **member_overrides) -> Path:
"""Default = a healthy episode with sample, exploit, workload events,
sharp CPU envelope. Overrides replace specific members."""
n = 60
end_ns = n * 100_000_000
members = {
"01TEST/meta.json": _meta(
sample={"name": "xmrig", "kind": "real", "family": "XMRig",
"category": "cryptominer", "profile": "cpu-saturate",
"sha256": "a" * 64},
exploit={"module_name": "vsftpd_234_backdoor", "module": "x"},
),
"01TEST/events.jsonl": _events([
{"event": "snapshot_load"},
{"event": "workload_setup"},
{"event": "workload_started", "phase": "infected_running"},
{"event": "workload_killed", "phase": "dormant",
"pre_kill_probe": {"yes": "2", "loadavg": "1.4"}},
{"event": "episode_end"},
]),
"01TEST/labels.jsonl": _labels(
[0, n // 3 * 100_000_000, 2 * n // 3 * 100_000_000],
["clean", "infected_running", "dormant"],
),
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=False, n=n),
}
members.update(member_overrides)
out = tmp_path / "01TEST.tar.zst"
_make_tar_zst(out, members)
return out
def test_healthy_episode_has_no_reasons(tmp_path: Path) -> None:
tar = _make_episode(tmp_path)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert q.reasons == [], f"unexpected reasons: {q.reasons}"
assert q.sample_name == "xmrig"
assert q.module_name == "vsftpd_234_backdoor"
def test_no_sample_flag(tmp_path: Path) -> None:
tar = _make_episode(
tmp_path,
**{"01TEST/meta.json": _meta(sample=None, exploit=None)},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "no-sample" in q.reasons
def test_no_workload_events_flag(tmp_path: Path) -> None:
tar = _make_episode(
tmp_path,
**{"01TEST/events.jsonl": _events([
{"event": "snapshot_load"},
{"event": "phase_transition", "to": "clean"},
{"event": "episode_end"},
])},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "no-workload-events" in q.reasons
def test_workload_failed_flag(tmp_path: Path) -> None:
tar = _make_episode(
tmp_path,
**{"01TEST/events.jsonl": _events([
{"event": "workload_setup"},
{"event": "workload_failed", "phase": "infected_running",
"error": "EOF on serial"},
{"event": "episode_end"},
])},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "workload-failed" in q.reasons
def test_workload_silent_flag(tmp_path: Path) -> None:
"""The elliott-lab fingerprint: dormant probe AND host-side CPU
both confirm the workload never fired. Both signals must agree
before we flag workload-silent (see CIS490#15 — the in-guest probe
alone was unreliable on busybox)."""
tar = _make_episode(
tmp_path,
**{
"01TEST/events.jsonl": _events([
{"event": "workload_setup"},
{"event": "workload_started", "phase": "infected_running"},
{"event": "workload_killed", "phase": "dormant",
"pre_kill_probe": {"yes": "0", "loadavg": "0.18"}},
]),
# Flat host CPU corroborates the probe — both agree no
# signal → workload-silent legitimately flags.
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=60),
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "workload-silent" in q.reasons
def test_flat_proc_rescued_by_netflow(tmp_path: Path) -> None:
"""A scan-and-dial / bursty-c2 episode leaves /proc nearly idle but
netflow shows clear inter-phase traffic deltas. Multi-signal
classifier must not flag this episode as flat."""
n = 60
netflow_rows = []
# phase boundaries match _make_episode default
for i in range(n * 5): # 100ms buckets
t = i * 20_000_000 # 20 ms per bucket
# heavy traffic only during infected_running (middle third)
in_burst = (n // 3 * 100_000_000) <= t < (2 * n // 3 * 100_000_000)
netflow_rows.append({
"t_mono_ns": t,
"t_wall_ns": t,
"bytes_in": 80_000 if in_burst else 0,
"bytes_out": 60_000 if in_burst else 0,
})
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" not in q.reasons, (
f"netflow burst should rescue this episode; got reasons={q.reasons}"
)
def test_flat_everywhere_still_flags(tmp_path: Path) -> None:
"""If /proc AND netflow AND qmp all show no inter-phase variation,
the episode is genuinely silent and must still flag."""
n = 60
netflow_rows = [
{"t_mono_ns": i * 20_000_000, "t_wall_ns": i * 20_000_000,
"bytes_in": 100, "bytes_out": 50}
for i in range(n * 5)
]
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" in q.reasons
def test_workload_silent_suppressed_when_host_cpu_real(tmp_path: Path) -> None:
"""CIS490#15 regression: busybox pgrep -c is unsupported, so the
in-guest probe always reports yes=0 on Alpine guests even when the
workload is saturating the vCPU. If host-side /proc telemetry shows
a real inter-phase CPU envelope, trust the host and DROP the
probe-based workload-silent reason — otherwise we false-positive
every Alpine episode."""
tar = _make_episode(
tmp_path,
**{
"01TEST/events.jsonl": _events([
{"event": "workload_setup"},
{"event": "workload_started", "phase": "infected_running"},
{"event": "workload_killed", "phase": "dormant",
"pre_kill_probe": {"yes": "0", "loadavg": "0.18"}},
]),
# Sharp host CPU envelope — workload IS running. Default
# _make_episode already supplies _proc_rows(flat=False).
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "workload-silent" not in q.reasons, (
f"probe-only signal must not flag silent when host CPU is real; "
f"got reasons={q.reasons}"
)
def test_flat_cpu_flag(tmp_path: Path) -> None:
"""When the proc CPU% spread between phases is < 5pp, the episode
has no signal for the trainer to learn from."""
tar = _make_episode(
tmp_path,
**{"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=60)},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" in q.reasons
# ---------------------------------------------------------------------------
# Walk + actions
# ---------------------------------------------------------------------------
def _stage_receiver_tree(tmp_path: Path) -> tuple[Path, Path]:
"""Build a fake /var/lib/cis490 layout with two episodes: one
healthy, one flagged for no-sample. Returns (episodes_root, index_path)."""
episodes = tmp_path / "episodes"
(episodes / "lab1").mkdir(parents=True)
healthy = _make_episode(episodes / "lab1" / "01OK")
healthy.rename(episodes / "lab1" / "01OK.tar.zst")
bad = _make_episode(
episodes / "lab1" / "01FAKE",
**{"01TEST/meta.json": _meta(sample=None)},
)
bad.rename(episodes / "lab1" / "01FAKE.tar.zst")
index = tmp_path / "index.jsonl"
rows = [
{"host_id": "lab1", "episode_id": "01OK"},
{"host_id": "lab1", "episode_id": "01FAKE"},
]
index.write_text("\n".join(json.dumps(r) for r in rows) + "\n")
return episodes, index
def test_dry_run_does_not_modify_anything(tmp_path: Path, capsys) -> None:
episodes, index = _stage_receiver_tree(tmp_path)
rc = pe.main([
"--episodes-root", str(episodes),
"--index", str(index),
"--reason", "no-sample",
])
# Returns 1 because flagged episodes exist (matches CLI exit semantics).
assert rc == 1
# Both tarballs still on disk.
assert (episodes / "lab1" / "01OK.tar.zst").exists()
assert (episodes / "lab1" / "01FAKE.tar.zst").exists()
# Index unchanged.
assert len(index.read_text().splitlines()) == 2
def test_archive_moves_flagged_and_rewrites_index(tmp_path: Path) -> None:
episodes, index = _stage_receiver_tree(tmp_path)
archive = tmp_path / "archive"
rc = pe.main([
"--episodes-root", str(episodes),
"--index", str(index),
"--archive-root", str(archive),
"--reason", "no-sample",
"--archive",
])
assert rc == 1
# 01OK kept.
assert (episodes / "lab1" / "01OK.tar.zst").exists()
# 01FAKE moved.
assert not (episodes / "lab1" / "01FAKE.tar.zst").exists()
assert (archive / "lab1" / "01FAKE.tar.zst").exists()
# Index dropped the bad row.
rows = [json.loads(l) for l in index.read_text().splitlines() if l.strip()]
assert len(rows) == 1
assert rows[0]["episode_id"] == "01OK"
def test_delete_removes_flagged_and_rewrites_index(tmp_path: Path) -> None:
episodes, index = _stage_receiver_tree(tmp_path)
rc = pe.main([
"--episodes-root", str(episodes),
"--index", str(index),
"--reason", "no-sample",
"--delete",
])
assert rc == 1
assert not (episodes / "lab1" / "01FAKE.tar.zst").exists()
rows = [json.loads(l) for l in index.read_text().splitlines() if l.strip()]
assert len(rows) == 1
def test_host_filter_scopes_to_one_lab_host(tmp_path: Path) -> None:
episodes, index = _stage_receiver_tree(tmp_path)
rc = pe.main([
"--episodes-root", str(episodes),
"--index", str(index),
"--reason", "no-sample",
"--host", "lab2", # nothing matches
])
assert rc == 0 # zero flagged → exit 0
assert (episodes / "lab1" / "01FAKE.tar.zst").exists()
def test_archive_preserves_index_mode(tmp_path: Path) -> None:
"""Regression: the prune tool's index rewrite must not change the
file's mode bits. Real-world failure: a sudo'd prune run replaced
the receiver's index with a root-owned file the service couldn't
append to, every PUT 500'd on _append_index."""
import stat as _stat
episodes, index = _stage_receiver_tree(tmp_path)
# Set a non-default mode so we can detect drift.
index.chmod(0o664)
before_mode = _stat.S_IMODE(index.stat().st_mode)
pe.main([
"--episodes-root", str(episodes),
"--index", str(index),
"--archive-root", str(tmp_path / "archive"),
"--reason", "no-sample",
"--archive",
])
after_mode = _stat.S_IMODE(index.stat().st_mode)
assert after_mode == before_mode, (
f"prune mutated index mode: {oct(before_mode)} -> {oct(after_mode)}"
)
def test_multiple_reasons_combine(tmp_path: Path) -> None:
"""An episode failing >1 signal is flagged once, all reasons listed."""
tar = _make_episode(
tmp_path,
**{"01TEST/meta.json": _meta(sample=None),
"01TEST/events.jsonl": _events([{"event": "snapshot_load"}])},
)
q = pe.classify_episode(tar, host_id="x", episode_id="01TEST")
assert "no-sample" in q.reasons
assert "no-workload-events" in q.reasons
assert q.fake