Multi-signal prune classifier: rescue valid episodes /proc misses

A laptop-class lab host (elliott-thinkpad) running 14 parallel fleet
slots can't deliver host /proc CPU% signal for the bursty profiles —
the per-VM share gets buried under contention. But the workloads ARE
running: qmp blockstats record 90+ MB written during infected_running
for io-walk episodes, netflow shows real packet bursts for
scan-and-dial, and the in-guest agent (when alive) shows load_1m
deltas the host can't see.

The classifier now cross-checks four sources before flagging an
episode:
  - /proc CPU% medians (host-side qemu)
  - netflow byte totals (bridge_pcap)
  - qmp blockstats per-phase DELTA (cumulative counters; deltas
    matter, not raw values)
  - guest-agent load_1m

An episode flags only if every available source agrees no
inter-phase signal. Missing sources are "unknown", not "flat".

Time-base bug also fixed: phase mapping now uses t_wall_ns (which
all sources stamp from CLOCK_REALTIME) rather than t_mono_ns —
netflow uses qemu boot-monotonic, /proc uses orchestrator-relative,
they don't share a number line.

Result on the live receiver:
  - 1067 active episodes, 100% kept under the new logic
  - 143 episodes rescued from a previous false-positive archive
  - Only the 9 genuinely-broken pre-Sample-propagation elliott-lab
    episodes remain archived (no-sample + no-workload-events)

Two new tests (test_flat_proc_rescued_by_netflow,
test_flat_everywhere_still_flags) pin the boundary so a future
regression surfaces immediately.

AGENTS.md gains a "classifier is multi-source" section explaining
the cross-check and the t_wall_ns invariant.
This commit is contained in:
max 2026-04-30 19:10:01 -05:00
parent 2707709299
commit 321ea63803
3 changed files with 240 additions and 31 deletions

View file

@ -201,6 +201,32 @@ older clone:
**If you hit any of these on a fresh install, pull main first** before
filing an issue — the issue is probably already closed.
### The classifier is multi-source — don't gut episodes on /proc alone
`tools/prune_episodes.py` cross-checks four telemetry sources before
flagging an episode as flat:
- `telemetry-proc.jsonl` — host qemu-system /proc CPU%
- `netflow.jsonl` — bridge_pcap byte counters (network profiles)
- `telemetry-qmp.jsonl` — virtio blockstats per-phase delta (io-walk,
ransomware-shape)
- `telemetry-guest.jsonl` — in-guest agent load_1m (low-and-slow,
any host with a working agent)
An episode flags as `flat-cpu` only when EVERY available source
shows no inter-phase variation. If `/proc` is flat but qmp blockstats
show 90 MB written during `infected_running`, the episode is kept —
the host /proc collector loses signal under contention but qmp sees
through. This is essential on laptop-class lab hosts (e.g.
elliott-thinkpad) where the guest is co-scheduled with 13 other VMs
and the per-VM /proc CPU% gets buried.
All four sources stamp `t_wall_ns`; phase mapping uses that, not
`t_mono_ns`, because /proc and labels are orchestrator-relative
while netflow/guest are wall-clock-anchored. If you add a new
collector, emit `t_wall_ns` from CLOCK_REALTIME on every row or your
data will silently bucket into "(pre)".
### Don't trust the in-guest probe alone — cross-check host CPU
The `pre_kill_probe.yes` / `pre_kill_probe.sh` fields in

View file

@ -69,7 +69,9 @@ def _events(rows: list[dict]) -> bytes:
def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
"""Synthesize /proc rows with either flat-CPU (no phase signal)
or sharply-spiking CPU (clear phase boundaries). The test labels
file pairs with these."""
file pairs with these. Both t_mono_ns and t_wall_ns are emitted
the classifier uses t_wall_ns for phase mapping (consistent across
sources whose t_mono_ns time-bases differ)."""
out: list[dict] = []
for i in range(n):
t = i * 100_000_000
@ -83,6 +85,7 @@ def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
)
out.append({
"t_mono_ns": t,
"t_wall_ns": t, # synthetic: identity to t_mono_ns for tests
"cpu_user_jiffies": jiff,
"cpu_sys_jiffies": 0,
"rss_bytes": 1024 * 1024,
@ -92,7 +95,8 @@ def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
def _labels(boundary_ns: list[int], names: list[str]) -> bytes:
rows = [
{"t_mono_ns": t, "phase": p, "prev": names[i - 1] if i else None}
{"t_mono_ns": t, "t_wall_ns": t, "phase": p,
"prev": names[i - 1] if i else None}
for i, (t, p) in enumerate(zip(boundary_ns, names))
]
return ("\n".join(json.dumps(r) for r in rows) + "\n").encode()
@ -202,6 +206,58 @@ def test_workload_silent_flag(tmp_path: Path) -> None:
assert "workload-silent" in q.reasons
def test_flat_proc_rescued_by_netflow(tmp_path: Path) -> None:
"""A scan-and-dial / bursty-c2 episode leaves /proc nearly idle but
netflow shows clear inter-phase traffic deltas. Multi-signal
classifier must not flag this episode as flat."""
n = 60
netflow_rows = []
# phase boundaries match _make_episode default
for i in range(n * 5): # 100ms buckets
t = i * 20_000_000 # 20 ms per bucket
# heavy traffic only during infected_running (middle third)
in_burst = (n // 3 * 100_000_000) <= t < (2 * n // 3 * 100_000_000)
netflow_rows.append({
"t_mono_ns": t,
"t_wall_ns": t,
"bytes_in": 80_000 if in_burst else 0,
"bytes_out": 60_000 if in_burst else 0,
})
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" not in q.reasons, (
f"netflow burst should rescue this episode; got reasons={q.reasons}"
)
def test_flat_everywhere_still_flags(tmp_path: Path) -> None:
"""If /proc AND netflow AND qmp all show no inter-phase variation,
the episode is genuinely silent and must still flag."""
n = 60
netflow_rows = [
{"t_mono_ns": i * 20_000_000, "t_wall_ns": i * 20_000_000,
"bytes_in": 100, "bytes_out": 50}
for i in range(n * 5)
]
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" in q.reasons
def test_workload_silent_suppressed_when_host_cpu_real(tmp_path: Path) -> None:
"""CIS490#15 regression: busybox pgrep -c is unsupported, so the
in-guest probe always reports yes=0 on Alpine guests even when the

View file

@ -141,6 +141,13 @@ def classify_episode(tar_zst: Path, host_id: str, episode_id: str) -> EpisodeQua
events = _read_jsonl_from_tar(tar, "events.jsonl")
proc = _read_jsonl_from_tar(tar, "telemetry-proc.jsonl")
labels = _read_jsonl_from_tar(tar, "labels.jsonl")
# Optional secondary telemetry sources — used to rescue
# episodes whose /proc CPU% is flat but whose signal lives in
# network bytes (scan-and-dial, bursty-c2, shell-resident),
# disk I/O (io-walk), or guest-side load (low-and-slow).
netflow = _read_jsonl_from_tar(tar, "netflow.jsonl")
qmp_rows = _read_jsonl_from_tar(tar, "telemetry-qmp.jsonl")
guest_rows = _read_jsonl_from_tar(tar, "telemetry-guest.jsonl")
sample = meta.get("sample")
if sample is None:
@ -175,45 +182,165 @@ def classify_episode(tar_zst: Path, host_id: str, episode_id: str) -> EpisodeQua
probe_says_silent = True
break
# flat-cpu: bucket /proc CPU% by phase, check inter-phase spread.
if proc and labels:
clk_tck = os.sysconf("SC_CLK_TCK")
# Multi-signal flatness: an episode is "flat" only if EVERY
# available telemetry source shows no inter-phase variation. A
# bursty network workload (scan-and-dial, bursty-c2) leaves /proc
# nearly idle but spikes netflow bytes — keeping such an episode
# in the dataset is the whole point. Similarly, io-walk's signal
# lives in qmp blockstats (virtio writes), and low-and-slow's
# lives in guest-side load_1m. Each helper returns True if its
# source DOES distinguish phases (i.e. has signal).
if not labels:
# No labels means no phase boundaries to compare across — skip
# the flatness analysis entirely. Episode is uncategorizable
# but not necessarily bad.
return q
def phase_at(t_ns: int) -> str:
cur = "(pre)"
for l in labels:
if l["t_mono_ns"] <= t_ns:
cur = l["phase"]
else:
break
return cur
# Use t_wall_ns rather than t_mono_ns for phase mapping. The host
# /proc collector and labels use orchestrator-relative t_mono_ns,
# but the bridge_pcap netflow rows use wall-clock-like t_mono_ns
# (qemu boot-monotonic seen from outside) — using a single
# numerical t_mono_ns silently buckets every netflow row into
# whichever phase happens to be last. t_wall_ns is consistent
# across sources because every collector stamps it from
# CLOCK_REALTIME at sample time.
def phase_at(row: dict) -> str:
tw = row.get("t_wall_ns")
if tw is None:
return "(pre)"
cur = "(pre)"
for lab in labels:
if lab.get("t_wall_ns", 0) <= tw:
cur = lab["phase"]
else:
break
return cur
per_phase: dict[str, list[float]] = {}
prev = None
for r in proc:
if prev is not None:
dt = (r["t_mono_ns"] - prev["t_mono_ns"]) / 1e9
if dt > 0:
djiff = (r["cpu_user_jiffies"] + r["cpu_sys_jiffies"]) - \
(prev["cpu_user_jiffies"] + prev["cpu_sys_jiffies"])
pct = 100.0 * (djiff / clk_tck) / dt
per_phase.setdefault(phase_at(r["t_mono_ns"]), []).append(pct)
prev = r
if per_phase:
medians = [statistics.median(v) for v in per_phase.values() if v]
if medians and (max(medians) - min(medians)) < 5.0:
q.reasons.append("flat-cpu")
proc_has_signal = _proc_cpu_has_signal(proc, phase_at)
netflow_has_signal = _netflow_has_signal(netflow, phase_at)
qmp_has_signal = _qmp_block_has_signal(qmp_rows, phase_at)
guest_has_signal = _guest_load_has_signal(guest_rows, phase_at)
# `flat-cpu` retains its name (existing reason) but now means "no
# available telemetry source distinguishes phases". `proc_has_signal`
# is None when /proc data is missing entirely — treat that as
# "unknown", not "flat".
sources = {
"proc": proc_has_signal,
"netflow": netflow_has_signal,
"qmp": qmp_has_signal,
"guest": guest_has_signal,
}
available = {k: v for k, v in sources.items() if v is not None}
if available and not any(available.values()):
q.reasons.append("flat-cpu")
# Confirm workload-silent only when host-side telemetry agrees.
# If the probe said silent but /proc CPU% shows a real inter-phase
# delta (i.e. NOT flat-cpu), trust the host-side ground truth and
# discard the probe result — the probe is busybox-pgrep-broken.
# If the probe said silent but ANY source shows real signal, trust
# the host-side ground truth and discard the probe result — the
# probe was busybox-pgrep-broken on Alpine until 2707709.
if probe_says_silent and "flat-cpu" in q.reasons:
q.reasons.append("workload-silent")
return q
# ---------------------------------------------------------------------------
# Per-source signal detection. Each returns:
# True → source has rows AND distinguishes phases (signal present)
# False → source has rows but every phase looks the same (flat)
# None → source is missing or empty (unknown — don't count it)
# ---------------------------------------------------------------------------
def _proc_cpu_has_signal(proc: list[dict], phase_at) -> bool | None:
"""/proc CPU%: median per-phase spread > 5 percentage points."""
if not proc:
return None
clk_tck = os.sysconf("SC_CLK_TCK")
per_phase: dict[str, list[float]] = {}
prev = None
for r in proc:
if prev is not None:
dt = (r["t_mono_ns"] - prev["t_mono_ns"]) / 1e9
if dt > 0:
djiff = (r["cpu_user_jiffies"] + r["cpu_sys_jiffies"]) - \
(prev["cpu_user_jiffies"] + prev["cpu_sys_jiffies"])
pct = 100.0 * (djiff / clk_tck) / dt
per_phase.setdefault(phase_at(r), []).append(pct)
prev = r
if not per_phase:
return None
medians = [statistics.median(v) for v in per_phase.values() if v]
if not medians:
return None
return (max(medians) - min(medians)) >= 5.0
def _netflow_has_signal(netflow: list[dict], phase_at) -> bool | None:
"""netflow bytes: total bytes_in+bytes_out per phase. Signal means
at least one phase has > 50 KiB more total traffic than the
quietest phase. Catches scan-and-dial, bursty-c2, shell-resident."""
if not netflow:
return None
per_phase_bytes: dict[str, int] = {}
for r in netflow:
ph = phase_at(r)
per_phase_bytes[ph] = per_phase_bytes.get(ph, 0) + \
int(r.get("bytes_in", 0)) + int(r.get("bytes_out", 0))
if not per_phase_bytes:
return None
return (max(per_phase_bytes.values()) - min(per_phase_bytes.values())) >= 50 * 1024
def _qmp_block_has_signal(qmp: list[dict], phase_at) -> bool | None:
"""QMP blockstats wr_bytes+rd_bytes per-phase DELTA. blockstats
are cumulative counters; comparing last-values across phases
always shows signal (counters monotonically increase). The
correct metric is bytes-written-DURING-each-phase: subtract
each phase's first sample from its last sample, then check
inter-phase spread. > 100 KiB delta in any phase vs another
means real disk activity concentrated there. Catches io-walk."""
if not qmp:
return None
per_phase_first: dict[str, int] = {}
per_phase_last: dict[str, int] = {}
for r in qmp:
bs = r.get("blockstats") or {}
total = 0
for dev, stats in bs.items():
if isinstance(stats, dict):
total += int(stats.get("wr_bytes", 0)) + int(stats.get("rd_bytes", 0))
ph = phase_at(r)
if ph not in per_phase_first:
per_phase_first[ph] = total
per_phase_last[ph] = total
deltas = [per_phase_last[p] - per_phase_first[p] for p in per_phase_last]
if len(deltas) < 2:
return None
return (max(deltas) - min(deltas)) >= 100 * 1024
def _guest_load_has_signal(guest: list[dict], phase_at) -> bool | None:
"""Guest agent load_1m: phase-medians spread > 0.10. Catches
low-and-slow (memory churn shows up as load even with idle /proc),
and any host where the guest agent is alive."""
if not guest:
return None
per_phase: dict[str, list[float]] = {}
for r in guest:
load = r.get("load_1m_5m_15m")
if not (isinstance(load, list) and load):
continue
per_phase.setdefault(phase_at(r), []).append(float(load[0]))
if not per_phase:
return None
medians = [statistics.median(v) for v in per_phase.values() if v]
if len(medians) < 2:
return None
return (max(medians) - min(medians)) >= 0.10
# ---------------------------------------------------------------------------
# Index walking + actions
# ---------------------------------------------------------------------------