"""Tests for cis490-prune. Builds synthetic episode tarballs (each flagged with a specific quality issue) and confirms the classifier catches them. Then exercises the index-walk + dry-run / archive / delete actions on a temp tree so we don't touch real data.""" from __future__ import annotations import io import json import shutil import subprocess import tarfile from pathlib import Path import pytest # Skip the whole module if zstd isn't on PATH (the prune tool shells # out for decompression, mirroring the shipper). zstd_available = shutil.which("zstd") is not None pytestmark = pytest.mark.skipif(not zstd_available, reason="needs system zstd") import sys ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT / "tools")) import prune_episodes as pe # noqa: E402 # --------------------------------------------------------------------------- # tar+zstd builder # --------------------------------------------------------------------------- def _make_tar_zst(out_path: Path, files: dict[str, bytes]) -> None: """Build a {episode_id}/ layout, tar it, zstd it.""" raw_tar = io.BytesIO() with tarfile.open(fileobj=raw_tar, mode="w") as t: for name, data in files.items(): info = tarfile.TarInfo(name=name) info.size = len(data) t.addfile(info, io.BytesIO(data)) out_path.parent.mkdir(parents=True, exist_ok=True) raw_tmp = out_path.with_suffix(".tar") raw_tmp.write_bytes(raw_tar.getvalue()) try: subprocess.check_call( ["zstd", "-q", "-19", "--stdout", str(raw_tmp)], stdout=out_path.open("wb"), ) finally: raw_tmp.unlink(missing_ok=True) def _meta(*, sample: dict | None = None, exploit: dict | None = None) -> bytes: return json.dumps({ "episode_id": "01TEST", "schema_version": 1, "sample": sample, "exploit": exploit, "result": {"phases_observed": ["clean", "infected_running", "dormant"]}, }, sort_keys=True).encode() def _events(rows: list[dict]) -> bytes: return ("\n".join(json.dumps(r, sort_keys=True) for r in rows) + "\n").encode() def _proc_rows(*, flat: bool, n: int = 80) -> bytes: """Synthesize /proc rows with either flat-CPU (no phase signal) or sharply-spiking CPU (clear phase boundaries). The test labels file pairs with these. Both t_mono_ns and t_wall_ns are emitted — the classifier uses t_wall_ns for phase mapping (consistent across sources whose t_mono_ns time-bases differ).""" out: list[dict] = [] for i in range(n): t = i * 100_000_000 if flat: jiff = 100 + i * 20 # uniform increment → flat CPU% else: # First third clean (low), middle infected (high), last third dormant (low). jiff = ( 100 + i * 20 if i < n // 3 or i >= 2 * n // 3 else 100 + i * 1000 # huge jump for "infected" ) out.append({ "t_mono_ns": t, "t_wall_ns": t, # synthetic: identity to t_mono_ns for tests "cpu_user_jiffies": jiff, "cpu_sys_jiffies": 0, "rss_bytes": 1024 * 1024, }) return ("\n".join(json.dumps(r) for r in out) + "\n").encode() def _labels(boundary_ns: list[int], names: list[str]) -> bytes: rows = [ {"t_mono_ns": t, "t_wall_ns": t, "phase": p, "prev": names[i - 1] if i else None} for i, (t, p) in enumerate(zip(boundary_ns, names)) ] return ("\n".join(json.dumps(r) for r in rows) + "\n").encode() # --------------------------------------------------------------------------- # Per-reason classifier tests # --------------------------------------------------------------------------- def _make_episode(tmp_path: Path, **member_overrides) -> Path: """Default = a healthy episode with sample, exploit, workload events, sharp CPU envelope. Overrides replace specific members.""" n = 60 end_ns = n * 100_000_000 members = { "01TEST/meta.json": _meta( sample={"name": "xmrig", "kind": "real", "family": "XMRig", "category": "cryptominer", "profile": "cpu-saturate", "sha256": "a" * 64}, exploit={"module_name": "vsftpd_234_backdoor", "module": "x"}, ), "01TEST/events.jsonl": _events([ {"event": "snapshot_load"}, {"event": "workload_setup"}, {"event": "workload_started", "phase": "infected_running"}, {"event": "workload_killed", "phase": "dormant", "pre_kill_probe": {"yes": "2", "loadavg": "1.4"}}, {"event": "episode_end"}, ]), "01TEST/labels.jsonl": _labels( [0, n // 3 * 100_000_000, 2 * n // 3 * 100_000_000], ["clean", "infected_running", "dormant"], ), "01TEST/telemetry-proc.jsonl": _proc_rows(flat=False, n=n), } members.update(member_overrides) out = tmp_path / "01TEST.tar.zst" _make_tar_zst(out, members) return out def test_healthy_episode_has_no_reasons(tmp_path: Path) -> None: tar = _make_episode(tmp_path) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert q.reasons == [], f"unexpected reasons: {q.reasons}" assert q.sample_name == "xmrig" assert q.module_name == "vsftpd_234_backdoor" def test_no_sample_flag(tmp_path: Path) -> None: tar = _make_episode( tmp_path, **{"01TEST/meta.json": _meta(sample=None, exploit=None)}, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "no-sample" in q.reasons def test_no_workload_events_flag(tmp_path: Path) -> None: tar = _make_episode( tmp_path, **{"01TEST/events.jsonl": _events([ {"event": "snapshot_load"}, {"event": "phase_transition", "to": "clean"}, {"event": "episode_end"}, ])}, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "no-workload-events" in q.reasons def test_workload_failed_flag(tmp_path: Path) -> None: tar = _make_episode( tmp_path, **{"01TEST/events.jsonl": _events([ {"event": "workload_setup"}, {"event": "workload_failed", "phase": "infected_running", "error": "EOF on serial"}, {"event": "episode_end"}, ])}, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "workload-failed" in q.reasons def test_workload_silent_flag(tmp_path: Path) -> None: """The elliott-lab fingerprint: dormant probe AND host-side CPU both confirm the workload never fired. Both signals must agree before we flag workload-silent (see CIS490#15 — the in-guest probe alone was unreliable on busybox).""" tar = _make_episode( tmp_path, **{ "01TEST/events.jsonl": _events([ {"event": "workload_setup"}, {"event": "workload_started", "phase": "infected_running"}, {"event": "workload_killed", "phase": "dormant", "pre_kill_probe": {"yes": "0", "loadavg": "0.18"}}, ]), # Flat host CPU corroborates the probe — both agree no # signal → workload-silent legitimately flags. "01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=60), }, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "workload-silent" in q.reasons def test_flat_proc_rescued_by_netflow(tmp_path: Path) -> None: """A scan-and-dial / bursty-c2 episode leaves /proc nearly idle but netflow shows clear inter-phase traffic deltas. Multi-signal classifier must not flag this episode as flat.""" n = 60 netflow_rows = [] # phase boundaries match _make_episode default for i in range(n * 5): # 100ms buckets t = i * 20_000_000 # 20 ms per bucket # heavy traffic only during infected_running (middle third) in_burst = (n // 3 * 100_000_000) <= t < (2 * n // 3 * 100_000_000) netflow_rows.append({ "t_mono_ns": t, "t_wall_ns": t, "bytes_in": 80_000 if in_burst else 0, "bytes_out": 60_000 if in_burst else 0, }) netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode() tar = _make_episode( tmp_path, **{ "01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n), "01TEST/netflow.jsonl": netflow_jsonl, }, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "flat-cpu" not in q.reasons, ( f"netflow burst should rescue this episode; got reasons={q.reasons}" ) def test_flat_everywhere_still_flags(tmp_path: Path) -> None: """If /proc AND netflow AND qmp all show no inter-phase variation, the episode is genuinely silent and must still flag.""" n = 60 netflow_rows = [ {"t_mono_ns": i * 20_000_000, "t_wall_ns": i * 20_000_000, "bytes_in": 100, "bytes_out": 50} for i in range(n * 5) ] netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode() tar = _make_episode( tmp_path, **{ "01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n), "01TEST/netflow.jsonl": netflow_jsonl, }, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "flat-cpu" in q.reasons def test_workload_silent_suppressed_when_host_cpu_real(tmp_path: Path) -> None: """CIS490#15 regression: busybox pgrep -c is unsupported, so the in-guest probe always reports yes=0 on Alpine guests even when the workload is saturating the vCPU. If host-side /proc telemetry shows a real inter-phase CPU envelope, trust the host and DROP the probe-based workload-silent reason — otherwise we false-positive every Alpine episode.""" tar = _make_episode( tmp_path, **{ "01TEST/events.jsonl": _events([ {"event": "workload_setup"}, {"event": "workload_started", "phase": "infected_running"}, {"event": "workload_killed", "phase": "dormant", "pre_kill_probe": {"yes": "0", "loadavg": "0.18"}}, ]), # Sharp host CPU envelope — workload IS running. Default # _make_episode already supplies _proc_rows(flat=False). }, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "workload-silent" not in q.reasons, ( f"probe-only signal must not flag silent when host CPU is real; " f"got reasons={q.reasons}" ) def test_flat_cpu_flag(tmp_path: Path) -> None: """When the proc CPU% spread between phases is < 5pp, the episode has no signal for the trainer to learn from.""" tar = _make_episode( tmp_path, **{"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=60)}, ) q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST") assert "flat-cpu" in q.reasons # --------------------------------------------------------------------------- # Walk + actions # --------------------------------------------------------------------------- def _stage_receiver_tree(tmp_path: Path) -> tuple[Path, Path]: """Build a fake /var/lib/cis490 layout with two episodes: one healthy, one flagged for no-sample. Returns (episodes_root, index_path).""" episodes = tmp_path / "episodes" (episodes / "lab1").mkdir(parents=True) healthy = _make_episode(episodes / "lab1" / "01OK") healthy.rename(episodes / "lab1" / "01OK.tar.zst") bad = _make_episode( episodes / "lab1" / "01FAKE", **{"01TEST/meta.json": _meta(sample=None)}, ) bad.rename(episodes / "lab1" / "01FAKE.tar.zst") index = tmp_path / "index.jsonl" rows = [ {"host_id": "lab1", "episode_id": "01OK"}, {"host_id": "lab1", "episode_id": "01FAKE"}, ] index.write_text("\n".join(json.dumps(r) for r in rows) + "\n") return episodes, index def test_dry_run_does_not_modify_anything(tmp_path: Path, capsys) -> None: episodes, index = _stage_receiver_tree(tmp_path) rc = pe.main([ "--episodes-root", str(episodes), "--index", str(index), "--reason", "no-sample", ]) # Returns 1 because flagged episodes exist (matches CLI exit semantics). assert rc == 1 # Both tarballs still on disk. assert (episodes / "lab1" / "01OK.tar.zst").exists() assert (episodes / "lab1" / "01FAKE.tar.zst").exists() # Index unchanged. assert len(index.read_text().splitlines()) == 2 def test_archive_moves_flagged_and_rewrites_index(tmp_path: Path) -> None: episodes, index = _stage_receiver_tree(tmp_path) archive = tmp_path / "archive" rc = pe.main([ "--episodes-root", str(episodes), "--index", str(index), "--archive-root", str(archive), "--reason", "no-sample", "--archive", ]) assert rc == 1 # 01OK kept. assert (episodes / "lab1" / "01OK.tar.zst").exists() # 01FAKE moved. assert not (episodes / "lab1" / "01FAKE.tar.zst").exists() assert (archive / "lab1" / "01FAKE.tar.zst").exists() # Index dropped the bad row. rows = [json.loads(l) for l in index.read_text().splitlines() if l.strip()] assert len(rows) == 1 assert rows[0]["episode_id"] == "01OK" def test_delete_removes_flagged_and_rewrites_index(tmp_path: Path) -> None: episodes, index = _stage_receiver_tree(tmp_path) rc = pe.main([ "--episodes-root", str(episodes), "--index", str(index), "--reason", "no-sample", "--delete", ]) assert rc == 1 assert not (episodes / "lab1" / "01FAKE.tar.zst").exists() rows = [json.loads(l) for l in index.read_text().splitlines() if l.strip()] assert len(rows) == 1 def test_host_filter_scopes_to_one_lab_host(tmp_path: Path) -> None: episodes, index = _stage_receiver_tree(tmp_path) rc = pe.main([ "--episodes-root", str(episodes), "--index", str(index), "--reason", "no-sample", "--host", "lab2", # nothing matches ]) assert rc == 0 # zero flagged → exit 0 assert (episodes / "lab1" / "01FAKE.tar.zst").exists() def test_archive_preserves_index_mode(tmp_path: Path) -> None: """Regression: the prune tool's index rewrite must not change the file's mode bits. Real-world failure: a sudo'd prune run replaced the receiver's index with a root-owned file the service couldn't append to, every PUT 500'd on _append_index.""" import stat as _stat episodes, index = _stage_receiver_tree(tmp_path) # Set a non-default mode so we can detect drift. index.chmod(0o664) before_mode = _stat.S_IMODE(index.stat().st_mode) pe.main([ "--episodes-root", str(episodes), "--index", str(index), "--archive-root", str(tmp_path / "archive"), "--reason", "no-sample", "--archive", ]) after_mode = _stat.S_IMODE(index.stat().st_mode) assert after_mode == before_mode, ( f"prune mutated index mode: {oct(before_mode)} -> {oct(after_mode)}" ) def test_multiple_reasons_combine(tmp_path: Path) -> None: """An episode failing >1 signal is flagged once, all reasons listed.""" tar = _make_episode( tmp_path, **{"01TEST/meta.json": _meta(sample=None), "01TEST/events.jsonl": _events([{"event": "snapshot_load"}])}, ) q = pe.classify_episode(tar, host_id="x", episode_id="01TEST") assert "no-sample" in q.reasons assert "no-workload-events" in q.reasons assert q.fake