"""Source 1 (oracle): host /proc/ sampler. Polls /proc//{stat,io,status} at a fixed interval and emits one JSONL row per tick into the episode telemetry file. This source is **oracle-only** — it does not exist on a deployed device, so every row is tagged ``available_in_deployment: false``. See ``docs/threat-model.md``. """ from __future__ import annotations import json import logging import os import threading import time from dataclasses import dataclass from pathlib import Path log = logging.getLogger("cis490.collectors.proc_qemu") SOURCE = "host_proc" AVAILABLE_IN_DEPLOYMENT = False PAGE_SIZE = os.sysconf("SC_PAGESIZE") @dataclass(frozen=True) class ProcStat: minflt: int majflt: int utime: int # clock ticks (jiffies) in user mode stime: int # clock ticks (jiffies) in kernel mode vsize: int # bytes rss_pages: int # in pages — multiply by PAGE_SIZE @dataclass(frozen=True) class ProcIO: read_bytes: int write_bytes: int @dataclass(frozen=True) class ProcStatus: voluntary_ctxsw: int involuntary_ctxsw: int def parse_proc_stat(data: str) -> ProcStat: """Parse the contents of /proc//stat. The ``comm`` field (field 2) can contain spaces and parens, so we anchor the split on the rightmost ')'. After that, fields are positional per ``man 5 proc``. """ rparen = data.rindex(")") # Skip ") " after the comm to land on the state field. fields = data[rparen + 2 :].split() # Index in `fields`: man 5 proc field number (1-indexed) # 0 state 3 # 1 ppid 4 # 2 pgrp 5 # 3 session 6 # 4 tty_nr 7 # 5 tpgid 8 # 6 flags 9 # 7 minflt 10 # 8 cminflt 11 # 9 majflt 12 # 10 cmajflt 13 # 11 utime 14 # 12 stime 15 # ... # 20 vsize 23 # 21 rss (pages) 24 return ProcStat( minflt=int(fields[7]), majflt=int(fields[9]), utime=int(fields[11]), stime=int(fields[12]), vsize=int(fields[20]), rss_pages=int(fields[21]), ) def parse_proc_io(data: str) -> ProcIO: """Parse /proc//io. Requires same uid (or CAP_SYS_PTRACE) to read.""" out = {} for line in data.splitlines(): k, _, v = line.partition(":") out[k.strip()] = int(v.strip()) return ProcIO(read_bytes=out["read_bytes"], write_bytes=out["write_bytes"]) def parse_proc_status(data: str) -> ProcStatus: """Parse /proc//status — only the two ctxsw fields we care about.""" vol = nvol = 0 for line in data.splitlines(): if line.startswith("voluntary_ctxt_switches:"): vol = int(line.split(":", 1)[1].strip()) elif line.startswith("nonvoluntary_ctxt_switches:"): nvol = int(line.split(":", 1)[1].strip()) return ProcStatus(voluntary_ctxsw=vol, involuntary_ctxsw=nvol) def _read_text(path: str) -> str | None: try: with open(path, "rb") as f: return f.read().decode("ascii", errors="replace") except (FileNotFoundError, ProcessLookupError, PermissionError): return None def collect_once( pid: int, t_mono_origin_ns: int, *, proc_root: str = "/proc", ) -> dict | None: """One sample. Returns None if the target pid is gone. ``proc_root`` is overridable for tests against a synthetic /proc tree. """ stat_text = _read_text(f"{proc_root}/{pid}/stat") if stat_text is None: return None stat = parse_proc_stat(stat_text) io_text = _read_text(f"{proc_root}/{pid}/io") io = parse_proc_io(io_text) if io_text is not None else None status_text = _read_text(f"{proc_root}/{pid}/status") status = parse_proc_status(status_text) if status_text is not None else None return { "t_mono_ns": time.monotonic_ns() - t_mono_origin_ns, "t_wall_ns": time.time_ns(), "source": SOURCE, "available_in_deployment": AVAILABLE_IN_DEPLOYMENT, "cpu_user_jiffies": stat.utime, "cpu_sys_jiffies": stat.stime, "rss_bytes": stat.rss_pages * PAGE_SIZE, "vsize_bytes": stat.vsize, "io_read_bytes": io.read_bytes if io is not None else None, "io_write_bytes": io.write_bytes if io is not None else None, "voluntary_ctxsw": status.voluntary_ctxsw if status is not None else None, "involuntary_ctxsw": status.involuntary_ctxsw if status is not None else None, "minor_faults": stat.minflt, "major_faults": stat.majflt, } def run_loop( pid: int, output_path: Path, t_mono_origin_ns: int, interval_ms: int, stop_event: threading.Event, *, proc_root: str = "/proc", ) -> int: """Sample at a fixed cadence until stop_event is set or pid disappears. Returns the number of rows written. """ interval_ns = interval_ms * 1_000_000 next_tick = time.monotonic_ns() rows = 0 output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("a", buffering=1) as f: # line-buffered while not stop_event.is_set(): row = collect_once(pid, t_mono_origin_ns, proc_root=proc_root) if row is None: log.info("target pid %d disappeared; stopping", pid) break f.write(json.dumps(row) + "\n") rows += 1 next_tick += interval_ns sleep_ns = next_tick - time.monotonic_ns() if sleep_ns > 0: # Use the event's wait so SIGTERM/stop is responsive. stop_event.wait(sleep_ns / 1_000_000_000) else: # We are behind schedule; resync. next_tick = time.monotonic_ns() return rows