from __future__ import annotations import json import os import threading import time from pathlib import Path import pytest from collectors.proc_qemu import ( PAGE_SIZE, collect_once, parse_proc_io, parse_proc_stat, parse_proc_status, run_loop, ) # A real-ish /proc//stat line where comm contains both spaces and parens. SAMPLE_STAT = ( "1234 (weird (comm) name) S 1 1234 1234 0 -1 4194304 " "412 0 3 0 " # minflt cminflt majflt cmajflt "142 38 0 0 " # utime stime cutime cstime "20 0 1 0 " # priority nice num_threads itrealvalue "100000000 " # starttime "1842933760 " # vsize "132453 " # rss (pages) "18446744073709551615 1 1 0 0 0 0 0 0 0 0 0 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0\n" ) SAMPLE_IO = ( "rchar: 12345\n" "wchar: 4096\n" "syscr: 100\n" "syscw: 50\n" "read_bytes: 8192\n" "write_bytes: 4096\n" "cancelled_write_bytes: 0\n" ) SAMPLE_STATUS = """\ Name: bash Umask: 0022 State: S (sleeping) Tgid: 1234 VmPeak: 12345 kB VmSize: 10000 kB VmRSS: 1024 kB voluntary_ctxt_switches: 12 nonvoluntary_ctxt_switches: 3 """ def test_parse_proc_stat_handles_parens_in_comm() -> None: s = parse_proc_stat(SAMPLE_STAT) assert s.minflt == 412 assert s.majflt == 3 assert s.utime == 142 assert s.stime == 38 assert s.vsize == 1842933760 assert s.rss_pages == 132453 def test_parse_proc_io() -> None: io = parse_proc_io(SAMPLE_IO) assert io.read_bytes == 8192 assert io.write_bytes == 4096 def test_parse_proc_status() -> None: s = parse_proc_status(SAMPLE_STATUS) assert s.voluntary_ctxsw == 12 assert s.involuntary_ctxsw == 3 def _write_synthetic_proc(root: Path, pid: int, *, with_io: bool = True) -> None: pdir = root / str(pid) pdir.mkdir(parents=True, exist_ok=True) (pdir / "stat").write_text(SAMPLE_STAT) (pdir / "status").write_text(SAMPLE_STATUS) if with_io: (pdir / "io").write_text(SAMPLE_IO) def test_collect_once_against_synthetic_proc(tmp_path: Path) -> None: _write_synthetic_proc(tmp_path, 1234) row = collect_once(1234, t_mono_origin_ns=0, proc_root=str(tmp_path)) assert row is not None assert row["source"] == "host_proc" assert row["available_in_deployment"] is False assert row["cpu_user_jiffies"] == 142 assert row["cpu_sys_jiffies"] == 38 assert row["rss_bytes"] == 132453 * PAGE_SIZE assert row["vsize_bytes"] == 1842933760 assert row["io_read_bytes"] == 8192 assert row["io_write_bytes"] == 4096 assert row["voluntary_ctxsw"] == 12 assert row["involuntary_ctxsw"] == 3 assert row["minor_faults"] == 412 assert row["major_faults"] == 3 assert row["t_mono_ns"] >= 0 assert row["t_wall_ns"] > 0 def test_collect_once_handles_missing_io(tmp_path: Path) -> None: _write_synthetic_proc(tmp_path, 1234, with_io=False) row = collect_once(1234, t_mono_origin_ns=0, proc_root=str(tmp_path)) assert row is not None assert row["io_read_bytes"] is None assert row["io_write_bytes"] is None def test_collect_once_returns_none_for_missing_pid(tmp_path: Path) -> None: row = collect_once(99999, t_mono_origin_ns=0, proc_root=str(tmp_path)) assert row is None def test_collect_once_against_real_self() -> None: """Smoke: read our own /proc entry. Most fields will be present.""" row = collect_once(os.getpid(), t_mono_origin_ns=time.monotonic_ns()) assert row is not None assert row["source"] == "host_proc" assert row["rss_bytes"] > 0 assert row["vsize_bytes"] > 0 def test_run_loop_writes_rows_until_stopped(tmp_path: Path) -> None: _write_synthetic_proc(tmp_path, 1234) out = tmp_path / "telemetry-proc.jsonl" stop = threading.Event() def stopper(): time.sleep(0.25) stop.set() t = threading.Thread(target=stopper) t.start() rows = run_loop( pid=1234, output_path=out, t_mono_origin_ns=time.monotonic_ns(), interval_ms=50, stop_event=stop, proc_root=str(tmp_path), ) t.join() assert rows >= 3 # roughly 5 ticks @ 50ms over 250ms lines = out.read_text().splitlines() assert len(lines) == rows # Each line is valid JSON with the expected schema. for line in lines: row = json.loads(line) assert row["source"] == "host_proc" assert row["available_in_deployment"] is False assert row["cpu_user_jiffies"] == 142 def test_run_loop_stops_when_pid_disappears(tmp_path: Path) -> None: _write_synthetic_proc(tmp_path, 1234) out = tmp_path / "telemetry-proc.jsonl" stop = threading.Event() def vanish(): time.sleep(0.1) # Delete the synthetic /proc// to simulate exit. for f in (tmp_path / "1234").iterdir(): f.unlink() (tmp_path / "1234").rmdir() t = threading.Thread(target=vanish) t.start() rows = run_loop( pid=1234, output_path=out, t_mono_origin_ns=time.monotonic_ns(), interval_ms=20, stop_event=stop, proc_root=str(tmp_path), ) t.join() assert rows >= 1 # Loop terminated even though stop was never set. assert not stop.is_set()