CIS490/tools/prune_episodes.py
max 2707709299 Fix workload-silent false-positive on Alpine busybox guests (closes #15)
On-device agent (k-gamingcom) ran the diagnostic probe sequence and
proved the workload IS running on Alpine — yes saturating the vCPU,
loadavg=1.05, three yes PIDs visible — but two busybox incompatibilities
made every episode look silent:

1. _probe() used `pgrep -c yes`. The -c flag is procps-ng/util-linux,
   not busybox. busybox pgrep exits 1 with a usage banner; the
   `|| echo 0` fallback then reported yes=0 every time. Switched to
   `pgrep yes | wc -l` which both pgrep variants support.

2. _wrap_loop appended `disown` after the nohup-backgrounded script.
   busybox sh / ash have no disown builtin, so each infected_running
   phase printed `sh: disown: not found` into run()'s captured output.
   The script kept running (nohup gives SIGHUP immunity, which is
   what disown was for), but the spurious error is now gone.

Cross-validation in the classifier:
- prune_episodes.py: workload-silent now requires the probe AND
  host-side /proc CPU envelope (flat-cpu) to AGREE. A probe-only zero
  is treated as the busybox false-positive and dropped. This means
  the 244 already-on-disk episodes from elliott-thinkpad and
  k-gamingcom are correctly classified without re-collecting.

Test coverage:
- test_workload_silent_flag updated to require both signals
- test_workload_silent_suppressed_when_host_cpu_real new regression
  for the busybox false-positive

AGENTS.md gains a "Don't trust the in-guest probe alone" section with
the busybox-vs-procps gotcha + a list of busybox-incompatible patterns
to avoid in any new in-guest diagnostic.
2026-04-30 17:28:48 -05:00

389 lines
14 KiB
Python

"""``cis490-prune`` — retroactively filter low-quality episodes from
the receiver's dataset.
The signals that mark an episode as low-quality:
no-sample meta.sample is null. Pre-Sample-propagation code
(commit a193d17 or earlier) ran the v1 yes-loop
fallback regardless of what the fleet picked, so
post-infection variety isn't recorded in meta.
no-workload-events events.jsonl has zero workload_* rows. Pre-audit-
trail code (commit d86502d or earlier) ran with
no event emission from VMLoadController, so we
can't tell whether the workload actually fired.
workload-failed events.jsonl contains a workload_failed row. The
SerialClient.run() raised mid-phase; the labels
and telemetry don't match what the orchestrator
was supposed to be doing.
workload-silent workload_killed event during the dormant phase
has pre_kill_probe.yes == "0", meaning no
``yes``-loop process was running when we tried
to kill it. This is the elliott-lab fingerprint:
the schedule walked but nothing fired in-guest.
flat-cpu /proc CPU% delta between phases is under 5
percentage points across all phase boundaries.
A model trained on these episodes can't
distinguish phases.
Usage:
cis490-prune # dry-run summary, no changes
cis490-prune --reason no-sample # filter to one signal
cis490-prune --archive # mv flagged episodes to
# /var/lib/cis490/episodes-archive/
cis490-prune --delete # rm flagged episodes + index rows
Run from the receiver's host where /var/lib/cis490/ lives. Operator
runs as root because the episode store is owned by the cis490 user
mode 0640.
"""
from __future__ import annotations
import argparse
import io
import json
import os
import shutil
import statistics
import subprocess
import sys
import tarfile
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
_REASONS = (
"no-sample",
"no-workload-events",
"workload-failed",
"workload-silent",
"flat-cpu",
)
@dataclass
class EpisodeQuality:
host_id: str
episode_id: str
tar_path: Path
size_bytes: int
reasons: list[str] = field(default_factory=list)
sample_name: str | None = None
module_name: str | None = None
@property
def fake(self) -> bool:
return bool(self.reasons)
# ---------------------------------------------------------------------------
# tarball introspection
# ---------------------------------------------------------------------------
def _read_jsonl_from_tar(tar: tarfile.TarFile, name_suffix: str) -> list[dict]:
"""Extract a JSONL member by name suffix (e.g. 'events.jsonl')."""
for m in tar.getmembers():
if m.name.endswith(name_suffix) and m.isfile():
f = tar.extractfile(m)
if f is None:
return []
text = f.read().decode("utf-8", errors="replace")
return [json.loads(line) for line in text.splitlines() if line.strip()]
return []
def _read_meta_from_tar(tar: tarfile.TarFile) -> dict:
for m in tar.getmembers():
if m.name.endswith("meta.json") and m.isfile():
f = tar.extractfile(m)
if f is None:
return {}
return json.loads(f.read().decode("utf-8"))
return {}
def _decompress_zstd(zst_path: Path) -> bytes:
"""Pure stdlib doesn't have zstd; shell out (already a project dep
— install scripts require it)."""
p = subprocess.run(
["zstd", "-q", "-d", "--stdout", str(zst_path)],
check=True, capture_output=True,
)
return p.stdout
def classify_episode(tar_zst: Path, host_id: str, episode_id: str) -> EpisodeQuality:
"""Open the tarball, scan meta + events + telemetry, return a
quality verdict. Each signal is independent — an episode can hit
multiple reasons (e.g. no-sample + workload-silent)."""
q = EpisodeQuality(
host_id=host_id,
episode_id=episode_id,
tar_path=tar_zst,
size_bytes=tar_zst.stat().st_size,
)
try:
raw = _decompress_zstd(tar_zst)
except (subprocess.CalledProcessError, OSError) as e:
q.reasons.append(f"unreadable: {e}"[:80])
return q
with tarfile.open(fileobj=io.BytesIO(raw)) as tar:
meta = _read_meta_from_tar(tar)
events = _read_jsonl_from_tar(tar, "events.jsonl")
proc = _read_jsonl_from_tar(tar, "telemetry-proc.jsonl")
labels = _read_jsonl_from_tar(tar, "labels.jsonl")
sample = meta.get("sample")
if sample is None:
q.reasons.append("no-sample")
else:
q.sample_name = sample.get("name")
exploit = meta.get("exploit")
if exploit is not None:
q.module_name = exploit.get("module_name")
workload_events = [e for e in events if str(e.get("event", "")).startswith("workload_")]
if not workload_events:
q.reasons.append("no-workload-events")
if any(e.get("event") == "workload_failed" for e in events):
q.reasons.append("workload-failed")
# workload-silent (provisional): dormant transition's probe shows
# no `yes` proc. This is a weak signal on its own — see CIS490#15:
# busybox pgrep -c is unsupported, so pre-fix episodes always
# report yes=0 even when the workload is saturating the vCPU. We
# only confirm workload-silent when host-side /proc telemetry
# (computed below) AGREES that no signal is present (flat-cpu).
probe_says_silent = False
for e in events:
if e.get("event") != "workload_killed":
continue
if e.get("phase") != "dormant":
continue
probe = e.get("pre_kill_probe")
if isinstance(probe, dict) and probe.get("yes") == "0":
probe_says_silent = True
break
# flat-cpu: bucket /proc CPU% by phase, check inter-phase spread.
if proc and labels:
clk_tck = os.sysconf("SC_CLK_TCK")
def phase_at(t_ns: int) -> str:
cur = "(pre)"
for l in labels:
if l["t_mono_ns"] <= t_ns:
cur = l["phase"]
else:
break
return cur
per_phase: dict[str, list[float]] = {}
prev = None
for r in proc:
if prev is not None:
dt = (r["t_mono_ns"] - prev["t_mono_ns"]) / 1e9
if dt > 0:
djiff = (r["cpu_user_jiffies"] + r["cpu_sys_jiffies"]) - \
(prev["cpu_user_jiffies"] + prev["cpu_sys_jiffies"])
pct = 100.0 * (djiff / clk_tck) / dt
per_phase.setdefault(phase_at(r["t_mono_ns"]), []).append(pct)
prev = r
if per_phase:
medians = [statistics.median(v) for v in per_phase.values() if v]
if medians and (max(medians) - min(medians)) < 5.0:
q.reasons.append("flat-cpu")
# Confirm workload-silent only when host-side telemetry agrees.
# If the probe said silent but /proc CPU% shows a real inter-phase
# delta (i.e. NOT flat-cpu), trust the host-side ground truth and
# discard the probe result — the probe is busybox-pgrep-broken.
if probe_says_silent and "flat-cpu" in q.reasons:
q.reasons.append("workload-silent")
return q
# ---------------------------------------------------------------------------
# Index walking + actions
# ---------------------------------------------------------------------------
def walk_index(index_path: Path, episodes_root: Path) -> Iterator[tuple[dict, Path]]:
if not index_path.exists():
return
for line in index_path.read_text().splitlines():
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
host = row.get("host_id", "")
ep = row.get("episode_id", "")
if not host or not ep:
continue
tar = episodes_root / host / f"{ep}.tar.zst"
if not tar.exists():
continue
yield row, tar
def apply_action(
quals: list[EpisodeQuality],
*,
action: str,
archive_root: Path,
index_path: Path,
) -> None:
"""Carry out --delete or --archive on flagged episodes + drop
matching rows from index.jsonl. Atomic-ish: index rewrite is
single-shot after all tarballs are handled."""
if action not in ("delete", "archive"):
return
flagged_ids = {q.episode_id for q in quals if q.fake}
if not flagged_ids:
return
if action == "archive":
archive_root.mkdir(parents=True, exist_ok=True)
for q in quals:
if not q.fake:
continue
if action == "archive":
target = archive_root / q.host_id
target.mkdir(parents=True, exist_ok=True)
shutil.move(str(q.tar_path), target / q.tar_path.name)
elif action == "delete":
q.tar_path.unlink(missing_ok=True)
if index_path.exists():
kept = []
for line in index_path.read_text().splitlines():
try:
row = json.loads(line)
except json.JSONDecodeError:
kept.append(line)
continue
if row.get("episode_id") in flagged_ids:
continue
kept.append(line)
# Rewrite via tempfile + replace so a crash mid-write doesn't
# corrupt the live index. os.replace drops ownership/mode from
# the original — when prune runs as root that leaves the new
# file root:root and locks out the cis490 receiver service
# (every PUT then 500s on _append_index). Snapshot stat before
# the rename, restore after.
st = index_path.stat()
tmp = index_path.with_suffix(".jsonl.partial")
tmp.write_text("\n".join(kept) + ("\n" if kept else ""))
os.replace(tmp, index_path)
try:
os.chown(index_path, st.st_uid, st.st_gid)
except (PermissionError, OSError):
# Best-effort: chown requires root, but if we got here as a
# non-root user the original ownership matched ours anyway.
pass
os.chmod(index_path, st.st_mode & 0o7777)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-prune")
p.add_argument("--episodes-root", type=Path,
default=Path("/var/lib/cis490/episodes"))
p.add_argument("--index", type=Path,
default=Path("/var/lib/cis490/index.jsonl"))
p.add_argument("--archive-root", type=Path,
default=Path("/var/lib/cis490/episodes-archive"))
p.add_argument("--reason", action="append", choices=_REASONS,
help="Only flag episodes matching this reason. Repeat "
"to OR multiple. Default: all reasons.")
p.add_argument("--host", help="Only consider episodes from this host_id")
action = p.add_mutually_exclusive_group()
action.add_argument("--delete", action="store_true",
help="Remove flagged tarballs + drop their index rows")
action.add_argument("--archive", action="store_true",
help="Move flagged tarballs to --archive-root + drop index rows")
p.add_argument("--json", action="store_true",
help="Machine-readable output instead of summary")
args = p.parse_args(argv)
if not args.episodes_root.exists():
print(f"no episodes dir at {args.episodes_root}", file=sys.stderr)
return 2
selected_reasons = set(args.reason or _REASONS)
quals: list[EpisodeQuality] = []
for row, tar in walk_index(args.index, args.episodes_root):
if args.host and row["host_id"] != args.host:
continue
q = classify_episode(tar, row["host_id"], row["episode_id"])
# Only mark "fake" if at least one of the selected reasons hits.
q.reasons = [r for r in q.reasons if r in selected_reasons]
quals.append(q)
flagged = [q for q in quals if q.fake]
kept = [q for q in quals if not q.fake]
if args.json:
print(json.dumps({
"scanned": len(quals),
"flagged": len(flagged),
"kept": len(kept),
"by_reason": {
r: sum(1 for q in flagged if r in q.reasons) for r in _REASONS
},
"flagged_episodes": [
{
"host": q.host_id,
"episode": q.episode_id,
"size_bytes": q.size_bytes,
"reasons": q.reasons,
"sample": q.sample_name,
"module": q.module_name,
} for q in flagged
],
}, indent=2))
else:
print(f"scanned: {len(quals)} flagged: {len(flagged)} kept: {len(kept)}")
if flagged:
print()
print(f"{'host':<14} {'episode':<28} {'size':>9} reasons")
for q in flagged:
print(f"{q.host_id:<14} {q.episode_id:<28} {q.size_bytes:>9} "
f"{','.join(q.reasons)}")
if not (args.delete or args.archive):
print()
print("dry-run only. Re-run with --archive (safer) or --delete.")
if args.delete or args.archive:
action = "delete" if args.delete else "archive"
apply_action(
quals,
action=action,
archive_root=args.archive_root,
index_path=args.index,
)
print(f"\n{action}d {sum(1 for q in flagged)} episodes")
return 0 if not flagged else 1
if __name__ == "__main__":
sys.exit(main())