CIS490/tests/test_collectors_emit.py
Max Gorog 0d51b9b253 PIPELINE §5 step 5: collector admission emit tests (§4.4)
Adds the missing emit-tests so every collector in KNOWN_COLLECTORS
has end-to-end coverage:

  * test_proc_emits_rows_against_self_pid
      Samples /proc/<own pid> for ~0.6s. Asserts ≥3 rows + populated
      core fields (cpu_user_jiffies, rss_bytes, vsize_bytes). Works
      anywhere with /proc.

  * test_pcap_bucketize_emits_rows_from_synthetic_capture
      Builds a 2-packet Ethernet+IPv4+TCP pcap in-memory, feeds it
      to pcap.bucketize, asserts ≥1 row written + total packet count
      across buckets matches input. Covers BOTH the pcap and netflow
      collectors (netflow IS the bucketized pcap output).

  * test_every_known_collector_has_emit_coverage
      Cross-cutting tripwire: for every name in KNOWN_COLLECTORS,
      either there's a test_collectors_emit.py test or there's an
      explicit COLLECTOR_TEST_CARVE_OUTS entry. Adding a collector
      to KNOWN_COLLECTORS without an emit test fails this. Carve-outs
      today: qmp (covered by tests/test_qmp.py — needs running QEMU
      for real-binary emit) and guest_agent (covered by
      tests/test_guest_agent.py — needs a real VM with the agent
      baked in).

The carve-outs are explicit, not implicit. A drift where someone
adds a new collector without a real-binary emit test fails CI before
the manifest can include it.

272 tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 01:37:40 -05:00

343 lines
14 KiB
Python

"""§4.4 collector emit tests — each collector MUST produce >=1 row when
run for a few seconds against a synthesized busy workload. A collector
that fails this is removed from the active set (PIPELINE.md §4.4) — no
silent zero-row inclusion.
These tests intentionally invoke the real collector binaries (perf,
tcpdump) against real subprocesses. They will skip on environments
where the binary or capability is unavailable, but they will fail —
not skip — when the binary IS present and the collector still emits
zero rows. The whole point is to catch the "collector silently
disabled" failure mode.
"""
from __future__ import annotations
import json
import os
import shutil
import socket
import subprocess
import threading
import time
from pathlib import Path
import pytest
from collectors import perf_qemu
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _spawn_busy_loop() -> subprocess.Popen:
"""Spawn a CPU-burning child whose PID we can hand to a collector.
`exec yes` so the captured PID IS the busy process — without exec,
the captured PID is the wrapping shell that sits parked waiting on
its child, and perf samples an idle process."""
return subprocess.Popen(
["sh", "-c", "exec yes >/dev/null"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _run_collector_briefly(target, *, seconds: float, **kw) -> int:
"""Spin a collector run_loop in a thread for `seconds`, then stop it.
Returns the row count the collector reports."""
stop = threading.Event()
result: dict[str, int] = {}
def _go() -> None:
result["rows"] = target(stop_event=stop, **kw)
th = threading.Thread(target=_go, daemon=True)
th.start()
time.sleep(seconds)
stop.set()
th.join(timeout=seconds + 5.0)
return result.get("rows", 0)
# ---------------------------------------------------------------------------
# perf
# ---------------------------------------------------------------------------
@pytest.mark.skipif(
shutil.which("perf") is None,
reason="perf binary not on PATH; this host can't host the perf collector",
)
def test_perf_emits_rows_against_busy_pid(tmp_path: Path) -> None:
"""The perf collector must emit at least one row when pointed at a
busy PID for a few seconds. Software events (page-faults,
context-switches, cpu-clock) are used so the test is portable
across CPUs that lack hardware performance counters; the production
DEFAULT_EVENTS adds hardware events on top, which is fine where
they're available and degrades gracefully where they're not.
Regression for: perf stat -j writes to stderr by default with -p,
so reading proc.stdout silently gives 0 lines and 0 rows. Fixed
by passing --log-fd 1 in the perf invocation.
"""
busy = _spawn_busy_loop()
try:
out = tmp_path / "telemetry-perf.jsonl"
rows = _run_collector_briefly(
perf_qemu.run_loop,
seconds=2.0,
pid=busy.pid,
output_path=out,
t_mono_origin_ns=0,
interval_ms=200,
events=("page-faults", "context-switches", "cpu-clock"),
)
finally:
busy.terminate()
try:
busy.wait(timeout=2.0)
except subprocess.TimeoutExpired:
busy.kill()
busy.wait(timeout=1.0)
assert rows >= 1, (
f"perf collector wrote 0 rows against a busy PID — see "
f"PIPELINE.md §4.4. File: {out}, exists={out.exists()}, "
f"size={out.stat().st_size if out.exists() else 'n/a'}"
)
# Sanity-check the on-disk file matches what run_loop reported.
on_disk = out.read_text().splitlines() if out.exists() else []
assert len(on_disk) == rows, (
f"row count mismatch: run_loop returned {rows} but "
f"{len(on_disk)} lines on disk"
)
# Spot-check the row shape — one parsed row should have the
# expected schema.
sample = json.loads(on_disk[0])
assert sample["source"] == "host_perf"
assert sample["available_in_deployment"] is False
assert "t_mono_ns" in sample and "interval_s" in sample
# At least one row must have a populated metric — if every metric
# is None on every row, the parser is dropping values. Regression
# for: event names come back as "cycles:u" / "instructions:u"
# under perf_event_paranoid=2 (userspace-only), but `_build_row`
# looks up plain "cycles" / "instructions" — so every metric was
# silently null even when perf reported real numbers. The mapped
# fields in the row schema are cycles, instructions, page_faults,
# context_switches, branches, branch_misses, cache_references,
# cache_misses; we only need ANY of them populated to confirm the
# parser is wiring values into the row.
parsed = [json.loads(l) for l in on_disk]
metric_keys = ("cycles", "instructions", "page_faults",
"context_switches", "branches")
assert any(r.get(k) is not None for r in parsed for k in metric_keys), (
f"every metric is None on every row — perf parser is dropping "
f"values. Sample row: {parsed[0]}"
)
# ---------------------------------------------------------------------------
# Tier-3 demo wiring regression
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# proc — must emit ≥1 row when run against a live PID
# ---------------------------------------------------------------------------
def test_proc_emits_rows_against_self_pid(tmp_path: Path) -> None:
"""The /proc collector samples a live PID. Anyone running this test
has a /proc/<pid>/stat for their own process, so this works in any
environment that has /proc — Linux, including CI containers."""
from collectors import proc_qemu
out = tmp_path / "telemetry-proc.jsonl"
rows = _run_collector_briefly(
proc_qemu.run_loop,
seconds=0.6,
pid=os.getpid(),
output_path=out,
t_mono_origin_ns=0,
interval_ms=100,
)
assert rows >= 3, (
f"proc collector wrote {rows} rows for ~5 expected ticks "
f"in 600ms. PIPELINE.md §4.4."
)
parsed = [json.loads(l) for l in out.read_text().splitlines()]
assert all(r["source"] == "host_proc" for r in parsed)
# Every row must have non-None values for the core fields — those
# are direct /proc reads that always succeed for a live PID.
for r in parsed:
assert r["cpu_user_jiffies"] is not None
assert r["rss_bytes"] is not None
assert r["vsize_bytes"] is not None
# ---------------------------------------------------------------------------
# pcap + netflow — bucketize a synthesized minimal pcap
# ---------------------------------------------------------------------------
def _build_minimal_pcap(path: Path) -> None:
"""Write a tiny pcap with two TCP packets so bucketize() has
something to count. Avoids needing tcpdump in CI."""
import struct
# pcap global header — microsec resolution, link-type 1 (Ethernet).
hdr = struct.pack("<IHHiIII",
0xa1b2c3d4, # magic
2, 4, # version major.minor
0, 0, # thiszone, sigfigs
65535, # snaplen
1) # linktype: ethernet
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("wb") as f:
f.write(hdr)
# Two synthetic frames: Ethernet + IPv4 + TCP. Keep them short
# but well-formed enough that the collector's parser walks them.
for ts_us in (1_000_000, 1_050_000):
# 14B Ethernet (src + dst MAC + ethertype 0x0800)
eth = (b"\x00\x11\x22\x33\x44\x55"
b"\x66\x77\x88\x99\xaa\xbb"
b"\x08\x00")
# 20B IPv4 — TCP (proto 6) from 10.200.0.1 → 10.200.0.10.
ipv4 = struct.pack(
">BBHHHBBHII",
0x45, 0, 40, # ver/ihl, tos, total len
0x1234, 0, # id, flags+frag
64, 6, 0, # ttl, proto=tcp, checksum (skip)
int.from_bytes(b"\x0a\xc8\x00\x01", "big"), # 10.200.0.1
int.from_bytes(b"\x0a\xc8\x00\x0a", "big"), # 10.200.0.10
)
# 20B TCP — sport 1234, dport 80, ACK flag
tcp = struct.pack(
">HHIIBBHHH",
1234, 80,
0, 0, 0x50, 0x10, 65535, 0, 0,
)
payload = eth + ipv4 + tcp
rec_hdr = struct.pack(
"<IIII",
ts_us // 1_000_000,
ts_us % 1_000_000,
len(payload), len(payload),
)
f.write(rec_hdr + payload)
def test_pcap_bucketize_emits_rows_from_synthetic_capture(tmp_path: Path) -> None:
"""bucketize is the netflow collector. With a synthesized
Ethernet+IPv4+TCP pcap, it MUST emit at least one bucket row.
Refs: PIPELINE.md §4.4 — netflow must produce ≥1 row against a
realistic packet capture. Synthesizing the pcap keeps the test
hermetic (no tcpdump or live interface needed in CI)."""
from collectors import pcap as pcap_collector
pcap_path = tmp_path / "network.pcap"
_build_minimal_pcap(pcap_path)
netflow_path = tmp_path / "netflow.jsonl"
rows = pcap_collector.bucketize(
pcap_path, netflow_path,
bucket_ms=100,
t_mono_origin_ns=0,
bridge_ip="10.200.0.1",
)
assert rows >= 1, (
f"netflow bucketize wrote {rows} rows for a 2-packet pcap. "
f"PIPELINE.md §4.4."
)
parsed = [json.loads(l) for l in netflow_path.read_text().splitlines()]
assert parsed
# The aggregator must have counted both packets — over the 50ms
# span of our synthetic capture, packet count should be > 0.
total_pkts = sum(r.get("pkts_in", 0) + r.get("pkts_out", 0)
for r in parsed)
assert total_pkts == 2, f"expected 2 packets across all buckets, got {total_pkts}"
# ---------------------------------------------------------------------------
# Cross-cutting: every name in KNOWN_COLLECTORS must be exercised by
# either an emit-test here or a documented carve-out below.
# ---------------------------------------------------------------------------
# Carve-outs: collectors that need a running guest VM to exercise
# end-to-end. Each entry MUST cite the unit-level test that exercises
# its parser/protocol code and the specific reason a real-binary emit
# test isn't tractable in CI.
COLLECTOR_TEST_CARVE_OUTS = {
"qmp": (
"tests/test_qmp.py — exercises the QMP wire parser against "
"captured query-status / query-blockstats fixtures. Real-binary "
"emit needs a running QEMU; covered in production by the "
"Tier-3 emit-test on lab hosts."
),
"guest_agent": (
"tests/test_guest_agent.py — exercises run_loop with a fake "
"agent unix-socket server feeding JSONL. Real-VM emit needs "
"a guest with the agent baked in (cidata path, covered by "
"production data on lab hosts)."
),
}
def test_every_known_collector_has_emit_coverage() -> None:
"""For each collector name in KNOWN_COLLECTORS, either there's a
test_collectors_emit.py emit test against a real binary, or there's
an explicit carve-out in COLLECTOR_TEST_CARVE_OUTS naming the
parser-level test that covers it. Adding a new collector without
either fails this assertion."""
from orchestrator.manifest import KNOWN_COLLECTORS
test_src = Path(__file__).read_text()
expected_emit_test_names = {
"proc": "test_proc_emits_rows_against",
"perf": "test_perf_emits_rows_against",
"pcap": "test_pcap_bucketize_emits_rows",
"netflow": "test_pcap_bucketize_emits_rows",
}
for collector in KNOWN_COLLECTORS:
if collector in COLLECTOR_TEST_CARVE_OUTS:
continue
needle = expected_emit_test_names.get(collector)
assert needle is not None, (
f"collector {collector!r} in KNOWN_COLLECTORS has no entry in "
f"expected_emit_test_names and no COLLECTOR_TEST_CARVE_OUTS "
f"entry. Either add a test or document the carve-out."
)
assert needle in test_src, (
f"collector {collector!r} expected an emit-test name "
f"containing {needle!r} in this file. PIPELINE.md §4.4."
)
def test_run_tier3_demo_wires_collector_sockets_into_episode_config() -> None:
"""`run_tier3_demo.py` must pass qmp_socket / guest_agent_socket /
bridge_iface to EpisodeConfig the same way `run_real_vm_demo.py`
does. Without these, those collectors silently emit zero rows on
every Tier-3 episode even though launch_target.sh creates the
underlying chardevs. Regression for: bug found 2026-05-03 against
elliott-thinkpad + k-gamingcom (rows_qmp=0 / rows_guest=0 / pcap=0
on 100% of Tier-3 episodes).
This is a source-grep test rather than an exec test because
run_tier3_demo.py boots qemu + msfrpcd, neither of which is
available in CI. The grep keeps the wiring honest with no
runtime cost."""
src = (Path(__file__).resolve().parent.parent
/ "tools" / "run_tier3_demo.py").read_text()
# The exact fragments that, if absent, mean the collectors will
# silently never start. Each must appear as a keyword arg of the
# EpisodeConfig(...) constructor call site.
for needle in (
"qmp_socket=qmp_sock",
"guest_agent_socket=agent_sock",
"bridge_iface=os.environ.get(\"BRIDGE\")",
):
assert needle in src, (
f"run_tier3_demo.py is missing `{needle}` on its "
f"EpisodeConfig — see PIPELINE.md §4.4. Tier-3 episodes "
f"will silently produce 0 rows for the corresponding "
f"collector."
)