"""Source 3 (oracle): ``perf stat -p `` sampler. Spawns ``perf stat`` in interval-JSON mode against the qemu pid and aggregates the per-event counter values into per-interval telemetry rows. Unlike the /proc and QMP collectors, perf needs CAP_SYS_ADMIN or ``kernel.perf_event_paranoid <= 1`` to read counters for a process the collector doesn't own — typically true on a lab host running QEMU under the cis490 service user. Source 3 is **oracle-only** — perf counters are not available on a deployed device. Every row carries ``available_in_deployment: false``. The events we ask for are the small canonical set named in docs/data-model.md: cycles, instructions, cache-references, cache-misses, branches, branch-misses, page-faults, context-switches Anything perf can't enable on the host (e.g. cache-misses without hardware support) is silently dropped from the row. """ from __future__ import annotations import json import logging import shutil import subprocess import threading import time from pathlib import Path log = logging.getLogger("cis490.collectors.perf_qemu") SOURCE = "host_perf" AVAILABLE_IN_DEPLOYMENT = False DEFAULT_EVENTS = ( "cycles", "instructions", "cache-references", "cache-misses", "branches", "branch-misses", "page-faults", "context-switches", ) def perf_available() -> bool: return shutil.which("perf") is not None def _coerce_int(s: str | int | None) -> int | None: if s is None: return None if isinstance(s, int): return s s = s.strip() if not s or s in ("", ""): return None # perf prints comma-separated thousands by default; we asked -j so # we usually get plain numbers, but guard for both shapes. s = s.replace(",", "") try: return int(s) except ValueError: try: return int(float(s)) except ValueError: return None def _build_row(t_mono_origin_ns: int, interval_s: float, agg: dict[str, int]) -> dict: cycles = agg.get("cycles") insns = agg.get("instructions") cache_refs = agg.get("cache-references") cache_miss = agg.get("cache-misses") ipc = (insns / cycles) if (cycles and insns) else None miss_rate = (cache_miss / cache_refs) if (cache_refs and cache_miss is not None) else None return { "t_mono_ns": time.monotonic_ns() - t_mono_origin_ns, "t_wall_ns": time.time_ns(), "source": SOURCE, "available_in_deployment": AVAILABLE_IN_DEPLOYMENT, "interval_s": interval_s, "cycles": cycles, "instructions": insns, "cache_references": cache_refs, "cache_misses": cache_miss, "branches": agg.get("branches"), "branch_misses": agg.get("branch-misses"), "page_faults": agg.get("page-faults"), "context_switches": agg.get("context-switches"), "ipc": ipc, "cache_miss_rate": miss_rate, } def parse_perf_event_line(line: str) -> dict | None: """Parse one ``perf stat -j`` event line. Returns None for blanks or status messages perf occasionally interleaves on stderr-ish paths but stdout-on-error in practice.""" line = line.strip() if not line.startswith("{"): return None try: return json.loads(line) except json.JSONDecodeError: return None def run_loop( pid: int, output_path: Path, t_mono_origin_ns: int, interval_ms: int, stop_event: threading.Event, *, events: tuple[str, ...] = DEFAULT_EVENTS, emit_event: "callable | None" = None, ) -> int: """Spawn perf stat -j against ``pid`` and stream rows until stop. Returns the number of rows written. When ``emit_event`` is provided, perf lifecycle markers (perf_unavailable / perf_started / perf_first_row / perf_no_counters) are written into the orchestrator's events.jsonl. Without this, a silent perf produces only a `rows_perf=0` count in meta.json which is indistinguishable from "perf was never enabled" — the §1 silent-downgrade pattern.""" if not perf_available(): log.warning("perf binary not on PATH — perf collector disabled") if emit_event is not None: emit_event("perf_unavailable", reason="binary_not_on_path") return 0 # perf stat writes its output (including -j JSON) to stderr by # default when -p / --pid is in use; only when perf forks the # workload itself does it go to stdout. --log-fd 1 forces output # onto fd 1 so we can stream it through proc.stdout. Without this # the collector silently writes 0 rows on every episode. cmd = [ "perf", "stat", "-p", str(pid), "-I", str(interval_ms), "-j", "--log-fd", "1", "-e", ",".join(events), ] log.info("starting perf: %s", " ".join(cmd)) try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, text=True, ) except (FileNotFoundError, PermissionError) as e: log.warning("perf launch failed: %s", e) if emit_event is not None: emit_event("perf_unavailable", reason=f"launch_failed: {e}") return 0 if emit_event is not None: emit_event("perf_started", pid=pid, events=list(events), interval_ms=interval_ms) rows = 0 output_path.parent.mkdir(parents=True, exist_ok=True) cur_interval: float | None = None agg: dict[str, int] = {} intervals_seen = 0 intervals_with_values = 0 def _flush() -> None: nonlocal rows, intervals_seen, intervals_with_values if cur_interval is None: return intervals_seen += 1 # Always emit a row when an interval is observed, even if every # event came back . agg-empty means perf saw the # interval but couldn't measure anything (paranoid level too # high, qemu PID owned by different user, hardware counters # unavailable, etc). Emitting None-valued rows distinguishes # "perf was running but counters were unavailable" from "perf # never ran at all" — which is the §1 visibility requirement # the silent return-without-write was violating. if agg: intervals_with_values += 1 row = _build_row(t_mono_origin_ns, cur_interval, agg) out_f.write(json.dumps(row) + "\n") rows += 1 if (rows == 1) and emit_event is not None: emit_event("perf_first_row", counters_populated=bool(agg), counters=list(agg.keys())) try: with output_path.open("a", buffering=1) as out_f: # perf interleaves events and writes to stdout in -j mode. # We read line by line until the process exits (which # happens when we kill it on stop, or when the target pid # disappears and perf's internal -p polling notices). assert proc.stdout is not None for line in proc.stdout: if stop_event.is_set(): break evt = parse_perf_event_line(line) if evt is None: continue interval = evt.get("interval") event_name = evt.get("event") value = _coerce_int(evt.get("counter-value")) if interval is None or event_name is None: continue # perf annotates event names with the privilege scope it # was actually able to measure (e.g. "cycles:u" when only # userspace is permitted under perf_event_paranoid=2). # Strip the suffix so _build_row's plain-name lookups # ("cycles", "instructions", ...) hit. event_name = event_name.split(":", 1)[0] # perf emits one JSON per (event, interval); a new # interval value means we should flush the previous row. if cur_interval is not None and interval != cur_interval: _flush() agg = {} cur_interval = interval if value is not None: agg[event_name] = value # End of stream — flush the last partial row. _flush() if emit_event is not None: emit_event( "perf_finished", rows=rows, intervals_seen=intervals_seen, intervals_with_values=intervals_with_values, ) if intervals_seen > 0 and intervals_with_values == 0: # Loud signal: perf was running, intervals ticked, but # every event came back . Likely cause: # perf_event_paranoid > 2, or qemu PID owned by a # different user than the perf collector. emit_event( "perf_no_counters", intervals_seen=intervals_seen, likely_cause=("perf_event_paranoid > 2 or " "qemu PID owned by different user"), ) finally: if proc.poll() is None: proc.terminate() try: proc.wait(timeout=3.0) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=2.0) return rows