CIS490/collectors/proc_qemu.py
Maximus Gorog 064387b7a0 Add v0 orchestrator + first oracle collector (host /proc)
End-to-end: ``python -m orchestrator --target-pid <pid> --duration N`` now
writes a complete episode directory matching docs/data-model.md, with phase
labels, events, and a 10 Hz host /proc telemetry stream. No VM yet — pid is
arbitrary so we can validate the loop against e.g. ``sleep 5`` while the lab
side comes up.

collectors/proc_qemu.py — parses /proc/<pid>/{stat,io,status} (handles parens
in comm), single-shot collect_once(), and a stop-event-driven run_loop()
that ticks at a fixed cadence and exits when the pid disappears. Tagged
``available_in_deployment: false`` per the threat-model doc.

orchestrator/episode.py — EpisodeRunner: creates data/episodes/<ulid>/,
atomic meta.json, events.jsonl + labels.jsonl writers, drives the collector
in a thread for duration_s, writes done.marker last so the shipper never
sees a half-finished episode.

orchestrator/ulid.py — tiny 26-char Crockford-base32 ULID generator.
Time-sortable, no third-party dep.

orchestrator/__main__.py — CLI entry point.

Tests (15 new, 28 total green):
- proc_qemu: real-ish stat with parens-in-comm, missing /proc/<pid>/io,
  missing pid, run_loop cadence, run_loop terminates when pid disappears.
- episode: full directory shape against os.getpid(), id override,
  done.marker written after meta.json finalize.
- ulid: length+alphabet, 2000-burst uniqueness, time-sortability.

Smoke-tested against ``sleep 10``: 16 rows over 1.5s at 100ms cadence,
monotonic clock, RSS stable at ~3.5 MiB as expected for an idle sleep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:40:25 -06:00

189 lines
6 KiB
Python

"""Source 1 (oracle): host /proc/<pid> sampler.
Polls /proc/<pid>/{stat,io,status} at a fixed interval and emits one JSONL
row per tick into the episode telemetry file.
This source is **oracle-only** — it does not exist on a deployed device, so
every row is tagged ``available_in_deployment: false``. See
``docs/threat-model.md``.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from dataclasses import dataclass
from pathlib import Path
log = logging.getLogger("cis490.collectors.proc_qemu")
SOURCE = "host_proc"
AVAILABLE_IN_DEPLOYMENT = False
PAGE_SIZE = os.sysconf("SC_PAGESIZE")
@dataclass(frozen=True)
class ProcStat:
minflt: int
majflt: int
utime: int # clock ticks (jiffies) in user mode
stime: int # clock ticks (jiffies) in kernel mode
vsize: int # bytes
rss_pages: int # in pages — multiply by PAGE_SIZE
@dataclass(frozen=True)
class ProcIO:
read_bytes: int
write_bytes: int
@dataclass(frozen=True)
class ProcStatus:
voluntary_ctxsw: int
involuntary_ctxsw: int
def parse_proc_stat(data: str) -> ProcStat:
"""Parse the contents of /proc/<pid>/stat.
The ``comm`` field (field 2) can contain spaces and parens, so we anchor
the split on the rightmost ')'. After that, fields are positional per
``man 5 proc``.
"""
rparen = data.rindex(")")
# Skip ") " after the comm to land on the state field.
fields = data[rparen + 2 :].split()
# Index in `fields`: man 5 proc field number (1-indexed)
# 0 state 3
# 1 ppid 4
# 2 pgrp 5
# 3 session 6
# 4 tty_nr 7
# 5 tpgid 8
# 6 flags 9
# 7 minflt 10
# 8 cminflt 11
# 9 majflt 12
# 10 cmajflt 13
# 11 utime 14
# 12 stime 15
# ...
# 20 vsize 23
# 21 rss (pages) 24
return ProcStat(
minflt=int(fields[7]),
majflt=int(fields[9]),
utime=int(fields[11]),
stime=int(fields[12]),
vsize=int(fields[20]),
rss_pages=int(fields[21]),
)
def parse_proc_io(data: str) -> ProcIO:
"""Parse /proc/<pid>/io. Requires same uid (or CAP_SYS_PTRACE) to read."""
out = {}
for line in data.splitlines():
k, _, v = line.partition(":")
out[k.strip()] = int(v.strip())
return ProcIO(read_bytes=out["read_bytes"], write_bytes=out["write_bytes"])
def parse_proc_status(data: str) -> ProcStatus:
"""Parse /proc/<pid>/status — only the two ctxsw fields we care about."""
vol = nvol = 0
for line in data.splitlines():
if line.startswith("voluntary_ctxt_switches:"):
vol = int(line.split(":", 1)[1].strip())
elif line.startswith("nonvoluntary_ctxt_switches:"):
nvol = int(line.split(":", 1)[1].strip())
return ProcStatus(voluntary_ctxsw=vol, involuntary_ctxsw=nvol)
def _read_text(path: str) -> str | None:
try:
with open(path, "rb") as f:
return f.read().decode("ascii", errors="replace")
except (FileNotFoundError, ProcessLookupError, PermissionError):
return None
def collect_once(
pid: int,
t_mono_origin_ns: int,
*,
proc_root: str = "/proc",
) -> dict | None:
"""One sample. Returns None if the target pid is gone.
``proc_root`` is overridable for tests against a synthetic /proc tree.
"""
stat_text = _read_text(f"{proc_root}/{pid}/stat")
if stat_text is None:
return None
stat = parse_proc_stat(stat_text)
io_text = _read_text(f"{proc_root}/{pid}/io")
io = parse_proc_io(io_text) if io_text is not None else None
status_text = _read_text(f"{proc_root}/{pid}/status")
status = parse_proc_status(status_text) if status_text is not None else None
return {
"t_mono_ns": time.monotonic_ns() - t_mono_origin_ns,
"t_wall_ns": time.time_ns(),
"source": SOURCE,
"available_in_deployment": AVAILABLE_IN_DEPLOYMENT,
"cpu_user_jiffies": stat.utime,
"cpu_sys_jiffies": stat.stime,
"rss_bytes": stat.rss_pages * PAGE_SIZE,
"vsize_bytes": stat.vsize,
"io_read_bytes": io.read_bytes if io is not None else None,
"io_write_bytes": io.write_bytes if io is not None else None,
"voluntary_ctxsw": status.voluntary_ctxsw if status is not None else None,
"involuntary_ctxsw": status.involuntary_ctxsw if status is not None else None,
"minor_faults": stat.minflt,
"major_faults": stat.majflt,
}
def run_loop(
pid: int,
output_path: Path,
t_mono_origin_ns: int,
interval_ms: int,
stop_event: threading.Event,
*,
proc_root: str = "/proc",
) -> int:
"""Sample at a fixed cadence until stop_event is set or pid disappears.
Returns the number of rows written.
"""
interval_ns = interval_ms * 1_000_000
next_tick = time.monotonic_ns()
rows = 0
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("a", buffering=1) as f: # line-buffered
while not stop_event.is_set():
row = collect_once(pid, t_mono_origin_ns, proc_root=proc_root)
if row is None:
log.info("target pid %d disappeared; stopping", pid)
break
f.write(json.dumps(row) + "\n")
rows += 1
next_tick += interval_ns
sleep_ns = next_tick - time.monotonic_ns()
if sleep_ns > 0:
# Use the event's wait so SIGTERM/stop is responsive.
stop_event.wait(sleep_ns / 1_000_000_000)
else:
# We are behind schedule; resync.
next_tick = time.monotonic_ns()
return rows