End-to-end: ``python -m orchestrator --target-pid <pid> --duration N`` now
writes a complete episode directory matching docs/data-model.md, with phase
labels, events, and a 10 Hz host /proc telemetry stream. No VM yet — pid is
arbitrary so we can validate the loop against e.g. ``sleep 5`` while the lab
side comes up.
collectors/proc_qemu.py — parses /proc/<pid>/{stat,io,status} (handles parens
in comm), single-shot collect_once(), and a stop-event-driven run_loop()
that ticks at a fixed cadence and exits when the pid disappears. Tagged
``available_in_deployment: false`` per the threat-model doc.
orchestrator/episode.py — EpisodeRunner: creates data/episodes/<ulid>/,
atomic meta.json, events.jsonl + labels.jsonl writers, drives the collector
in a thread for duration_s, writes done.marker last so the shipper never
sees a half-finished episode.
orchestrator/ulid.py — tiny 26-char Crockford-base32 ULID generator.
Time-sortable, no third-party dep.
orchestrator/__main__.py — CLI entry point.
Tests (15 new, 28 total green):
- proc_qemu: real-ish stat with parens-in-comm, missing /proc/<pid>/io,
missing pid, run_loop cadence, run_loop terminates when pid disappears.
- episode: full directory shape against os.getpid(), id override,
done.marker written after meta.json finalize.
- ulid: length+alphabet, 2000-burst uniqueness, time-sortability.
Smoke-tested against ``sleep 10``: 16 rows over 1.5s at 100ms cadence,
monotonic clock, RSS stable at ~3.5 MiB as expected for an idle sleep.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
185 lines
5.2 KiB
Python
185 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from collectors.proc_qemu import (
|
|
PAGE_SIZE,
|
|
collect_once,
|
|
parse_proc_io,
|
|
parse_proc_stat,
|
|
parse_proc_status,
|
|
run_loop,
|
|
)
|
|
|
|
|
|
# A real-ish /proc/<pid>/stat line where comm contains both spaces and parens.
|
|
SAMPLE_STAT = (
|
|
"1234 (weird (comm) name) S 1 1234 1234 0 -1 4194304 "
|
|
"412 0 3 0 " # minflt cminflt majflt cmajflt
|
|
"142 38 0 0 " # utime stime cutime cstime
|
|
"20 0 1 0 " # priority nice num_threads itrealvalue
|
|
"100000000 " # starttime
|
|
"1842933760 " # vsize
|
|
"132453 " # rss (pages)
|
|
"18446744073709551615 1 1 0 0 0 0 0 0 0 0 0 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0\n"
|
|
)
|
|
|
|
SAMPLE_IO = (
|
|
"rchar: 12345\n"
|
|
"wchar: 4096\n"
|
|
"syscr: 100\n"
|
|
"syscw: 50\n"
|
|
"read_bytes: 8192\n"
|
|
"write_bytes: 4096\n"
|
|
"cancelled_write_bytes: 0\n"
|
|
)
|
|
|
|
SAMPLE_STATUS = """\
|
|
Name: bash
|
|
Umask: 0022
|
|
State: S (sleeping)
|
|
Tgid: 1234
|
|
VmPeak: 12345 kB
|
|
VmSize: 10000 kB
|
|
VmRSS: 1024 kB
|
|
voluntary_ctxt_switches: 12
|
|
nonvoluntary_ctxt_switches: 3
|
|
"""
|
|
|
|
|
|
def test_parse_proc_stat_handles_parens_in_comm() -> None:
|
|
s = parse_proc_stat(SAMPLE_STAT)
|
|
assert s.minflt == 412
|
|
assert s.majflt == 3
|
|
assert s.utime == 142
|
|
assert s.stime == 38
|
|
assert s.vsize == 1842933760
|
|
assert s.rss_pages == 132453
|
|
|
|
|
|
def test_parse_proc_io() -> None:
|
|
io = parse_proc_io(SAMPLE_IO)
|
|
assert io.read_bytes == 8192
|
|
assert io.write_bytes == 4096
|
|
|
|
|
|
def test_parse_proc_status() -> None:
|
|
s = parse_proc_status(SAMPLE_STATUS)
|
|
assert s.voluntary_ctxsw == 12
|
|
assert s.involuntary_ctxsw == 3
|
|
|
|
|
|
def _write_synthetic_proc(root: Path, pid: int, *, with_io: bool = True) -> None:
|
|
pdir = root / str(pid)
|
|
pdir.mkdir(parents=True, exist_ok=True)
|
|
(pdir / "stat").write_text(SAMPLE_STAT)
|
|
(pdir / "status").write_text(SAMPLE_STATUS)
|
|
if with_io:
|
|
(pdir / "io").write_text(SAMPLE_IO)
|
|
|
|
|
|
def test_collect_once_against_synthetic_proc(tmp_path: Path) -> None:
|
|
_write_synthetic_proc(tmp_path, 1234)
|
|
row = collect_once(1234, t_mono_origin_ns=0, proc_root=str(tmp_path))
|
|
assert row is not None
|
|
assert row["source"] == "host_proc"
|
|
assert row["available_in_deployment"] is False
|
|
assert row["cpu_user_jiffies"] == 142
|
|
assert row["cpu_sys_jiffies"] == 38
|
|
assert row["rss_bytes"] == 132453 * PAGE_SIZE
|
|
assert row["vsize_bytes"] == 1842933760
|
|
assert row["io_read_bytes"] == 8192
|
|
assert row["io_write_bytes"] == 4096
|
|
assert row["voluntary_ctxsw"] == 12
|
|
assert row["involuntary_ctxsw"] == 3
|
|
assert row["minor_faults"] == 412
|
|
assert row["major_faults"] == 3
|
|
assert row["t_mono_ns"] >= 0
|
|
assert row["t_wall_ns"] > 0
|
|
|
|
|
|
def test_collect_once_handles_missing_io(tmp_path: Path) -> None:
|
|
_write_synthetic_proc(tmp_path, 1234, with_io=False)
|
|
row = collect_once(1234, t_mono_origin_ns=0, proc_root=str(tmp_path))
|
|
assert row is not None
|
|
assert row["io_read_bytes"] is None
|
|
assert row["io_write_bytes"] is None
|
|
|
|
|
|
def test_collect_once_returns_none_for_missing_pid(tmp_path: Path) -> None:
|
|
row = collect_once(99999, t_mono_origin_ns=0, proc_root=str(tmp_path))
|
|
assert row is None
|
|
|
|
|
|
def test_collect_once_against_real_self() -> None:
|
|
"""Smoke: read our own /proc entry. Most fields will be present."""
|
|
row = collect_once(os.getpid(), t_mono_origin_ns=time.monotonic_ns())
|
|
assert row is not None
|
|
assert row["source"] == "host_proc"
|
|
assert row["rss_bytes"] > 0
|
|
assert row["vsize_bytes"] > 0
|
|
|
|
|
|
def test_run_loop_writes_rows_until_stopped(tmp_path: Path) -> None:
|
|
_write_synthetic_proc(tmp_path, 1234)
|
|
out = tmp_path / "telemetry-proc.jsonl"
|
|
stop = threading.Event()
|
|
|
|
def stopper():
|
|
time.sleep(0.25)
|
|
stop.set()
|
|
|
|
t = threading.Thread(target=stopper)
|
|
t.start()
|
|
rows = run_loop(
|
|
pid=1234,
|
|
output_path=out,
|
|
t_mono_origin_ns=time.monotonic_ns(),
|
|
interval_ms=50,
|
|
stop_event=stop,
|
|
proc_root=str(tmp_path),
|
|
)
|
|
t.join()
|
|
assert rows >= 3 # roughly 5 ticks @ 50ms over 250ms
|
|
lines = out.read_text().splitlines()
|
|
assert len(lines) == rows
|
|
# Each line is valid JSON with the expected schema.
|
|
for line in lines:
|
|
row = json.loads(line)
|
|
assert row["source"] == "host_proc"
|
|
assert row["available_in_deployment"] is False
|
|
assert row["cpu_user_jiffies"] == 142
|
|
|
|
|
|
def test_run_loop_stops_when_pid_disappears(tmp_path: Path) -> None:
|
|
_write_synthetic_proc(tmp_path, 1234)
|
|
out = tmp_path / "telemetry-proc.jsonl"
|
|
stop = threading.Event()
|
|
|
|
def vanish():
|
|
time.sleep(0.1)
|
|
# Delete the synthetic /proc/<pid>/ to simulate exit.
|
|
for f in (tmp_path / "1234").iterdir():
|
|
f.unlink()
|
|
(tmp_path / "1234").rmdir()
|
|
|
|
t = threading.Thread(target=vanish)
|
|
t.start()
|
|
rows = run_loop(
|
|
pid=1234,
|
|
output_path=out,
|
|
t_mono_origin_ns=time.monotonic_ns(),
|
|
interval_ms=20,
|
|
stop_event=stop,
|
|
proc_root=str(tmp_path),
|
|
)
|
|
t.join()
|
|
assert rows >= 1
|
|
# Loop terminated even though stop was never set.
|
|
assert not stop.is_set()
|