"""§4.4 collector emit tests — each collector MUST produce >=1 row when run for a few seconds against a synthesized busy workload. A collector that fails this is removed from the active set (PIPELINE.md §4.4) — no silent zero-row inclusion. These tests intentionally invoke the real collector binaries (perf, tcpdump) against real subprocesses. They will skip on environments where the binary or capability is unavailable, but they will fail — not skip — when the binary IS present and the collector still emits zero rows. The whole point is to catch the "collector silently disabled" failure mode. """ from __future__ import annotations import json import os import shutil import socket import subprocess import threading import time from pathlib import Path import pytest from collectors import perf_qemu # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _spawn_busy_loop() -> subprocess.Popen: """Spawn a CPU-burning child whose PID we can hand to a collector. `exec yes` so the captured PID IS the busy process — without exec, the captured PID is the wrapping shell that sits parked waiting on its child, and perf samples an idle process.""" return subprocess.Popen( ["sh", "-c", "exec yes >/dev/null"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _run_collector_briefly(target, *, seconds: float, **kw) -> int: """Spin a collector run_loop in a thread for `seconds`, then stop it. Returns the row count the collector reports.""" stop = threading.Event() result: dict[str, int] = {} def _go() -> None: result["rows"] = target(stop_event=stop, **kw) th = threading.Thread(target=_go, daemon=True) th.start() time.sleep(seconds) stop.set() th.join(timeout=seconds + 5.0) return result.get("rows", 0) # --------------------------------------------------------------------------- # perf # --------------------------------------------------------------------------- @pytest.mark.skipif( shutil.which("perf") is None, reason="perf binary not on PATH; this host can't host the perf collector", ) def test_perf_emits_rows_against_busy_pid(tmp_path: Path) -> None: """The perf collector must emit at least one row when pointed at a busy PID for a few seconds. Software events (page-faults, context-switches, cpu-clock) are used so the test is portable across CPUs that lack hardware performance counters; the production DEFAULT_EVENTS adds hardware events on top, which is fine where they're available and degrades gracefully where they're not. Regression for: perf stat -j writes to stderr by default with -p, so reading proc.stdout silently gives 0 lines and 0 rows. Fixed by passing --log-fd 1 in the perf invocation. """ busy = _spawn_busy_loop() try: out = tmp_path / "telemetry-perf.jsonl" rows = _run_collector_briefly( perf_qemu.run_loop, seconds=2.0, pid=busy.pid, output_path=out, t_mono_origin_ns=0, interval_ms=200, events=("page-faults", "context-switches", "cpu-clock"), ) finally: busy.terminate() try: busy.wait(timeout=2.0) except subprocess.TimeoutExpired: busy.kill() busy.wait(timeout=1.0) assert rows >= 1, ( f"perf collector wrote 0 rows against a busy PID — see " f"PIPELINE.md §4.4. File: {out}, exists={out.exists()}, " f"size={out.stat().st_size if out.exists() else 'n/a'}" ) # Sanity-check the on-disk file matches what run_loop reported. on_disk = out.read_text().splitlines() if out.exists() else [] assert len(on_disk) == rows, ( f"row count mismatch: run_loop returned {rows} but " f"{len(on_disk)} lines on disk" ) # Spot-check the row shape — one parsed row should have the # expected schema. sample = json.loads(on_disk[0]) assert sample["source"] == "host_perf" assert sample["available_in_deployment"] is False assert "t_mono_ns" in sample and "interval_s" in sample # At least one row must have a populated metric — if every metric # is None on every row, the parser is dropping values. Regression # for: event names come back as "cycles:u" / "instructions:u" # under perf_event_paranoid=2 (userspace-only), but `_build_row` # looks up plain "cycles" / "instructions" — so every metric was # silently null even when perf reported real numbers. The mapped # fields in the row schema are cycles, instructions, page_faults, # context_switches, branches, branch_misses, cache_references, # cache_misses; we only need ANY of them populated to confirm the # parser is wiring values into the row. parsed = [json.loads(l) for l in on_disk] metric_keys = ("cycles", "instructions", "page_faults", "context_switches", "branches") assert any(r.get(k) is not None for r in parsed for k in metric_keys), ( f"every metric is None on every row — perf parser is dropping " f"values. Sample row: {parsed[0]}" ) # --------------------------------------------------------------------------- # Tier-3 demo wiring regression # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # proc — must emit ≥1 row when run against a live PID # --------------------------------------------------------------------------- def test_proc_emits_rows_against_self_pid(tmp_path: Path) -> None: """The /proc collector samples a live PID. Anyone running this test has a /proc//stat for their own process, so this works in any environment that has /proc — Linux, including CI containers.""" from collectors import proc_qemu out = tmp_path / "telemetry-proc.jsonl" rows = _run_collector_briefly( proc_qemu.run_loop, seconds=0.6, pid=os.getpid(), output_path=out, t_mono_origin_ns=0, interval_ms=100, ) assert rows >= 3, ( f"proc collector wrote {rows} rows for ~5 expected ticks " f"in 600ms. PIPELINE.md §4.4." ) parsed = [json.loads(l) for l in out.read_text().splitlines()] assert all(r["source"] == "host_proc" for r in parsed) # Every row must have non-None values for the core fields — those # are direct /proc reads that always succeed for a live PID. for r in parsed: assert r["cpu_user_jiffies"] is not None assert r["rss_bytes"] is not None assert r["vsize_bytes"] is not None # --------------------------------------------------------------------------- # pcap + netflow — bucketize a synthesized minimal pcap # --------------------------------------------------------------------------- def _build_minimal_pcap(path: Path) -> None: """Write a tiny pcap with two TCP packets so bucketize() has something to count. Avoids needing tcpdump in CI.""" import struct # pcap global header — microsec resolution, link-type 1 (Ethernet). hdr = struct.pack("BBHHHBBHII", 0x45, 0, 40, # ver/ihl, tos, total len 0x1234, 0, # id, flags+frag 64, 6, 0, # ttl, proto=tcp, checksum (skip) int.from_bytes(b"\x0a\xc8\x00\x01", "big"), # 10.200.0.1 int.from_bytes(b"\x0a\xc8\x00\x0a", "big"), # 10.200.0.10 ) # 20B TCP — sport 1234, dport 80, ACK flag tcp = struct.pack( ">HHIIBBHHH", 1234, 80, 0, 0, 0x50, 0x10, 65535, 0, 0, ) payload = eth + ipv4 + tcp rec_hdr = struct.pack( " None: """bucketize is the netflow collector. With a synthesized Ethernet+IPv4+TCP pcap, it MUST emit at least one bucket row. Refs: PIPELINE.md §4.4 — netflow must produce ≥1 row against a realistic packet capture. Synthesizing the pcap keeps the test hermetic (no tcpdump or live interface needed in CI).""" from collectors import pcap as pcap_collector pcap_path = tmp_path / "network.pcap" _build_minimal_pcap(pcap_path) netflow_path = tmp_path / "netflow.jsonl" rows = pcap_collector.bucketize( pcap_path, netflow_path, bucket_ms=100, t_mono_origin_ns=0, bridge_ip="10.200.0.1", ) assert rows >= 1, ( f"netflow bucketize wrote {rows} rows for a 2-packet pcap. " f"PIPELINE.md §4.4." ) parsed = [json.loads(l) for l in netflow_path.read_text().splitlines()] assert parsed # The aggregator must have counted both packets — over the 50ms # span of our synthetic capture, packet count should be > 0. total_pkts = sum(r.get("pkts_in", 0) + r.get("pkts_out", 0) for r in parsed) assert total_pkts == 2, f"expected 2 packets across all buckets, got {total_pkts}" # --------------------------------------------------------------------------- # Cross-cutting: every name in KNOWN_COLLECTORS must be exercised by # either an emit-test here or a documented carve-out below. # --------------------------------------------------------------------------- # Carve-outs: collectors that need a running guest VM to exercise # end-to-end. Each entry MUST cite the unit-level test that exercises # its parser/protocol code and the specific reason a real-binary emit # test isn't tractable in CI. COLLECTOR_TEST_CARVE_OUTS = { "qmp": ( "tests/test_qmp.py — exercises the QMP wire parser against " "captured query-status / query-blockstats fixtures. Real-binary " "emit needs a running QEMU; covered in production by the " "Tier-3 emit-test on lab hosts." ), "guest_agent": ( "tests/test_guest_agent.py — exercises run_loop with a fake " "agent unix-socket server feeding JSONL. Real-VM emit needs " "a guest with the agent baked in (cidata path, covered by " "production data on lab hosts)." ), } def test_every_known_collector_has_emit_coverage() -> None: """For each collector name in KNOWN_COLLECTORS, either there's a test_collectors_emit.py emit test against a real binary, or there's an explicit carve-out in COLLECTOR_TEST_CARVE_OUTS naming the parser-level test that covers it. Adding a new collector without either fails this assertion.""" from orchestrator.manifest import KNOWN_COLLECTORS test_src = Path(__file__).read_text() expected_emit_test_names = { "proc": "test_proc_emits_rows_against", "perf": "test_perf_emits_rows_against", "pcap": "test_pcap_bucketize_emits_rows", "netflow": "test_pcap_bucketize_emits_rows", } for collector in KNOWN_COLLECTORS: if collector in COLLECTOR_TEST_CARVE_OUTS: continue needle = expected_emit_test_names.get(collector) assert needle is not None, ( f"collector {collector!r} in KNOWN_COLLECTORS has no entry in " f"expected_emit_test_names and no COLLECTOR_TEST_CARVE_OUTS " f"entry. Either add a test or document the carve-out." ) assert needle in test_src, ( f"collector {collector!r} expected an emit-test name " f"containing {needle!r} in this file. PIPELINE.md §4.4." ) def test_run_tier3_demo_wires_collector_sockets_into_episode_config() -> None: """`run_tier3_demo.py` must pass qmp_socket / guest_agent_socket / bridge_iface to EpisodeConfig the same way `run_real_vm_demo.py` does. Without these, those collectors silently emit zero rows on every Tier-3 episode even though launch_target.sh creates the underlying chardevs. Regression for: bug found 2026-05-03 against elliott-thinkpad + k-gamingcom (rows_qmp=0 / rows_guest=0 / pcap=0 on 100% of Tier-3 episodes). This is a source-grep test rather than an exec test because run_tier3_demo.py boots qemu + msfrpcd, neither of which is available in CI. The grep keeps the wiring honest with no runtime cost.""" src = (Path(__file__).resolve().parent.parent / "tools" / "run_tier3_demo.py").read_text() # The exact fragments that, if absent, mean the collectors will # silently never start. Each must appear as a keyword arg of the # EpisodeConfig(...) constructor call site. for needle in ( "qmp_socket=qmp_sock", "guest_agent_socket=agent_sock", "bridge_iface=os.environ.get(\"BRIDGE\")", ): assert needle in src, ( f"run_tier3_demo.py is missing `{needle}` on its " f"EpisodeConfig — see PIPELINE.md §4.4. Tier-3 episodes " f"will silently produce 0 rows for the corresponding " f"collector." )