Validation on k-gamingcom (commit ac7b85f) showed perf enabled in
production but rows_perf=0 on every episode. Without lifecycle events
the failure mode is indistinguishable from "perf wasn't enabled" — §1
silent-downgrade. The events now surface the actual cause:
- perf_unavailable — binary missing OR launch failed (with reason)
- perf_started — perf is running (pid, events, interval)
- perf_first_row — first row written; counters_populated tells
whether any event was actually counted
- perf_finished — final tally (intervals_seen,
intervals_with_values)
- perf_no_counters — perf was alive but every interval came back
<not counted> (likely paranoid > 2 or PID
ownership mismatch)
`_flush()` now writes a row whenever an interval is observed, even
when every event was <not counted>. The all-None row is honest data
("perf observed this interval and counted nothing"), and the rows
become a count of observed intervals rather than a count of
successful measurements — distinct from rows_proc / rows_qmp which
do count successful measurements. Trainers filter on
`cycles is not None` etc. when they need only populated rows.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
264 lines
9.3 KiB
Python
264 lines
9.3 KiB
Python
"""Source 3 (oracle): ``perf stat -p <qemu_pid>`` sampler.
|
|
|
|
Spawns ``perf stat`` in interval-JSON mode against the qemu pid and
|
|
aggregates the per-event counter values into per-interval telemetry
|
|
rows. Unlike the /proc and QMP collectors, perf needs CAP_SYS_ADMIN
|
|
or ``kernel.perf_event_paranoid <= 1`` to read counters for a process
|
|
the collector doesn't own — typically true on a lab host running
|
|
QEMU under the cis490 service user.
|
|
|
|
Source 3 is **oracle-only** — perf counters are not available on a
|
|
deployed device. Every row carries ``available_in_deployment: false``.
|
|
|
|
The events we ask for are the small canonical set named in
|
|
docs/data-model.md:
|
|
|
|
cycles, instructions, cache-references, cache-misses,
|
|
branches, branch-misses, page-faults, context-switches
|
|
|
|
Anything perf can't enable on the host (e.g. cache-misses without
|
|
hardware support) is silently dropped from the row.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
log = logging.getLogger("cis490.collectors.perf_qemu")
|
|
|
|
SOURCE = "host_perf"
|
|
AVAILABLE_IN_DEPLOYMENT = False
|
|
|
|
DEFAULT_EVENTS = (
|
|
"cycles",
|
|
"instructions",
|
|
"cache-references",
|
|
"cache-misses",
|
|
"branches",
|
|
"branch-misses",
|
|
"page-faults",
|
|
"context-switches",
|
|
)
|
|
|
|
|
|
def perf_available() -> bool:
|
|
return shutil.which("perf") is not None
|
|
|
|
|
|
def _coerce_int(s: str | int | None) -> int | None:
|
|
if s is None:
|
|
return None
|
|
if isinstance(s, int):
|
|
return s
|
|
s = s.strip()
|
|
if not s or s in ("<not counted>", "<not supported>"):
|
|
return None
|
|
# perf prints comma-separated thousands by default; we asked -j so
|
|
# we usually get plain numbers, but guard for both shapes.
|
|
s = s.replace(",", "")
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
try:
|
|
return int(float(s))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _build_row(t_mono_origin_ns: int, interval_s: float, agg: dict[str, int]) -> dict:
|
|
cycles = agg.get("cycles")
|
|
insns = agg.get("instructions")
|
|
cache_refs = agg.get("cache-references")
|
|
cache_miss = agg.get("cache-misses")
|
|
ipc = (insns / cycles) if (cycles and insns) else None
|
|
miss_rate = (cache_miss / cache_refs) if (cache_refs and cache_miss is not None) else None
|
|
|
|
return {
|
|
"t_mono_ns": time.monotonic_ns() - t_mono_origin_ns,
|
|
"t_wall_ns": time.time_ns(),
|
|
"source": SOURCE,
|
|
"available_in_deployment": AVAILABLE_IN_DEPLOYMENT,
|
|
"interval_s": interval_s,
|
|
"cycles": cycles,
|
|
"instructions": insns,
|
|
"cache_references": cache_refs,
|
|
"cache_misses": cache_miss,
|
|
"branches": agg.get("branches"),
|
|
"branch_misses": agg.get("branch-misses"),
|
|
"page_faults": agg.get("page-faults"),
|
|
"context_switches": agg.get("context-switches"),
|
|
"ipc": ipc,
|
|
"cache_miss_rate": miss_rate,
|
|
}
|
|
|
|
|
|
def parse_perf_event_line(line: str) -> dict | None:
|
|
"""Parse one ``perf stat -j`` event line. Returns None for blanks
|
|
or status messages perf occasionally interleaves on stderr-ish
|
|
paths but stdout-on-error in practice."""
|
|
line = line.strip()
|
|
if not line.startswith("{"):
|
|
return None
|
|
try:
|
|
return json.loads(line)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def run_loop(
|
|
pid: int,
|
|
output_path: Path,
|
|
t_mono_origin_ns: int,
|
|
interval_ms: int,
|
|
stop_event: threading.Event,
|
|
*,
|
|
events: tuple[str, ...] = DEFAULT_EVENTS,
|
|
emit_event: "callable | None" = None,
|
|
) -> int:
|
|
"""Spawn perf stat -j against ``pid`` and stream rows until stop.
|
|
Returns the number of rows written.
|
|
|
|
When ``emit_event`` is provided, perf lifecycle markers
|
|
(perf_unavailable / perf_started / perf_first_row / perf_no_counters)
|
|
are written into the orchestrator's events.jsonl. Without this, a
|
|
silent perf produces only a `rows_perf=0` count in meta.json which
|
|
is indistinguishable from "perf was never enabled" — the §1
|
|
silent-downgrade pattern."""
|
|
if not perf_available():
|
|
log.warning("perf binary not on PATH — perf collector disabled")
|
|
if emit_event is not None:
|
|
emit_event("perf_unavailable", reason="binary_not_on_path")
|
|
return 0
|
|
|
|
# perf stat writes its output (including -j JSON) to stderr by
|
|
# default when -p / --pid is in use; only when perf forks the
|
|
# workload itself does it go to stdout. --log-fd 1 forces output
|
|
# onto fd 1 so we can stream it through proc.stdout. Without this
|
|
# the collector silently writes 0 rows on every episode.
|
|
cmd = [
|
|
"perf", "stat",
|
|
"-p", str(pid),
|
|
"-I", str(interval_ms),
|
|
"-j",
|
|
"--log-fd", "1",
|
|
"-e", ",".join(events),
|
|
]
|
|
log.info("starting perf: %s", " ".join(cmd))
|
|
|
|
try:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
bufsize=1,
|
|
text=True,
|
|
)
|
|
except (FileNotFoundError, PermissionError) as e:
|
|
log.warning("perf launch failed: %s", e)
|
|
if emit_event is not None:
|
|
emit_event("perf_unavailable", reason=f"launch_failed: {e}")
|
|
return 0
|
|
|
|
if emit_event is not None:
|
|
emit_event("perf_started", pid=pid, events=list(events),
|
|
interval_ms=interval_ms)
|
|
|
|
rows = 0
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cur_interval: float | None = None
|
|
agg: dict[str, int] = {}
|
|
intervals_seen = 0
|
|
intervals_with_values = 0
|
|
|
|
def _flush() -> None:
|
|
nonlocal rows, intervals_seen, intervals_with_values
|
|
if cur_interval is None:
|
|
return
|
|
intervals_seen += 1
|
|
# Always emit a row when an interval is observed, even if every
|
|
# event came back <not counted>. agg-empty means perf saw the
|
|
# interval but couldn't measure anything (paranoid level too
|
|
# high, qemu PID owned by different user, hardware counters
|
|
# unavailable, etc). Emitting None-valued rows distinguishes
|
|
# "perf was running but counters were unavailable" from "perf
|
|
# never ran at all" — which is the §1 visibility requirement
|
|
# the silent return-without-write was violating.
|
|
if agg:
|
|
intervals_with_values += 1
|
|
row = _build_row(t_mono_origin_ns, cur_interval, agg)
|
|
out_f.write(json.dumps(row) + "\n")
|
|
rows += 1
|
|
if (rows == 1) and emit_event is not None:
|
|
emit_event("perf_first_row",
|
|
counters_populated=bool(agg),
|
|
counters=list(agg.keys()))
|
|
|
|
try:
|
|
with output_path.open("a", buffering=1) as out_f:
|
|
# perf interleaves events and writes to stdout in -j mode.
|
|
# We read line by line until the process exits (which
|
|
# happens when we kill it on stop, or when the target pid
|
|
# disappears and perf's internal -p polling notices).
|
|
assert proc.stdout is not None
|
|
for line in proc.stdout:
|
|
if stop_event.is_set():
|
|
break
|
|
evt = parse_perf_event_line(line)
|
|
if evt is None:
|
|
continue
|
|
interval = evt.get("interval")
|
|
event_name = evt.get("event")
|
|
value = _coerce_int(evt.get("counter-value"))
|
|
if interval is None or event_name is None:
|
|
continue
|
|
# perf annotates event names with the privilege scope it
|
|
# was actually able to measure (e.g. "cycles:u" when only
|
|
# userspace is permitted under perf_event_paranoid=2).
|
|
# Strip the suffix so _build_row's plain-name lookups
|
|
# ("cycles", "instructions", ...) hit.
|
|
event_name = event_name.split(":", 1)[0]
|
|
# perf emits one JSON per (event, interval); a new
|
|
# interval value means we should flush the previous row.
|
|
if cur_interval is not None and interval != cur_interval:
|
|
_flush()
|
|
agg = {}
|
|
cur_interval = interval
|
|
if value is not None:
|
|
agg[event_name] = value
|
|
# End of stream — flush the last partial row.
|
|
_flush()
|
|
if emit_event is not None:
|
|
emit_event(
|
|
"perf_finished",
|
|
rows=rows,
|
|
intervals_seen=intervals_seen,
|
|
intervals_with_values=intervals_with_values,
|
|
)
|
|
if intervals_seen > 0 and intervals_with_values == 0:
|
|
# Loud signal: perf was running, intervals ticked, but
|
|
# every event came back <not counted>. Likely cause:
|
|
# perf_event_paranoid > 2, or qemu PID owned by a
|
|
# different user than the perf collector.
|
|
emit_event(
|
|
"perf_no_counters",
|
|
intervals_seen=intervals_seen,
|
|
likely_cause=("perf_event_paranoid > 2 or "
|
|
"qemu PID owned by different user"),
|
|
)
|
|
finally:
|
|
if proc.poll() is None:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=3.0)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
proc.wait(timeout=2.0)
|
|
|
|
return rows
|