"""Tests for the host-side guest-agent collector. We simulate the in-guest agent by spinning up a unix socket server (stand-in for the QEMU virtio-serial chardev) that writes a few JSON-lines rows. The collector should read them, re-stamp with the host's monotonic clock, and persist to telemetry-guest.jsonl. """ from __future__ import annotations import json import socket import threading import time from pathlib import Path import pytest from collectors import guest_agent class FakeAgentServer(threading.Thread): def __init__(self, sock_path: Path, rows: list[dict], delay_s: float = 0.05) -> None: super().__init__(daemon=True) self.sock_path = sock_path self.rows = rows self.delay_s = delay_s self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock.bind(str(sock_path)) self._sock.listen(1) self._sock.settimeout(5.0) def run(self) -> None: try: conn, _ = self._sock.accept() except socket.timeout: return try: for row in self.rows: conn.sendall((json.dumps(row) + "\n").encode()) time.sleep(self.delay_s) time.sleep(0.1) finally: conn.close() self._sock.close() def test_collector_reads_jsonl_and_restamps(tmp_path: Path) -> None: sock_path = tmp_path / "agent.sock" rows_in = [ { "t_guest_mono_ns": 1, "t_guest_wall_ns": 2, "source": "guest_agent", "available_in_deployment": True, "mem_total_bytes": 256 * 1024 * 1024, "mem_available_bytes": 200 * 1024 * 1024, "load_1m_5m_15m": [0.1, 0.05, 0.0], "cpu_total_jiffies": {"user": 10, "system": 5, "idle": 1000}, }, { "t_guest_mono_ns": 100_000_000, "t_guest_wall_ns": 100_000_002, "source": "guest_agent", "available_in_deployment": True, "mem_total_bytes": 256 * 1024 * 1024, "mem_available_bytes": 198 * 1024 * 1024, }, ] server = FakeAgentServer(sock_path, rows_in, delay_s=0.02) server.start() out_path = tmp_path / "telemetry-guest.jsonl" stop = threading.Event() def stop_after(ms: int) -> None: time.sleep(ms / 1000.0) stop.set() threading.Thread(target=stop_after, args=(300,), daemon=True).start() rows_written = guest_agent.run_loop( socket_path=sock_path, output_path=out_path, t_mono_origin_ns=time.monotonic_ns(), stop_event=stop, connect_timeout_s=2.0, ) server.join(timeout=2) assert rows_written == 2 persisted = [json.loads(l) for l in out_path.read_text().splitlines()] assert len(persisted) == 2 for orig, got in zip(rows_in, persisted): # Original guest timestamps preserved. assert got["t_guest_mono_ns"] == orig["t_guest_mono_ns"] # Host-clock fields added. assert "t_mono_ns" in got assert "t_wall_ns" in got assert got["source"] == "guest_agent" assert got["available_in_deployment"] is True def test_collector_returns_zero_when_socket_missing(tmp_path: Path) -> None: rows = guest_agent.run_loop( socket_path=tmp_path / "no-socket-here.sock", output_path=tmp_path / "out.jsonl", t_mono_origin_ns=time.monotonic_ns(), stop_event=threading.Event(), connect_timeout_s=0.5, ) assert rows == 0 def test_collector_drops_malformed_lines_but_keeps_going(tmp_path: Path) -> None: sock_path = tmp_path / "agent.sock" # Will be sent verbatim; the malformed line should be skipped. payload = ( b'{"source":"guest_agent","mem_total_bytes":1}\n' b'this-is-not-json\n' b'{"source":"guest_agent","mem_total_bytes":2}\n' ) class Server(threading.Thread): def __init__(self) -> None: super().__init__(daemon=True) self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock.bind(str(sock_path)) self._sock.listen(1) def run(self) -> None: conn, _ = self._sock.accept() try: conn.sendall(payload) time.sleep(0.2) finally: conn.close() self._sock.close() s = Server() s.start() out_path = tmp_path / "out.jsonl" stop = threading.Event() threading.Thread( target=lambda: (time.sleep(0.4), stop.set()), daemon=True ).start() rows = guest_agent.run_loop( socket_path=sock_path, output_path=out_path, t_mono_origin_ns=time.monotonic_ns(), stop_event=stop, connect_timeout_s=2.0, ) s.join(timeout=2) assert rows == 2 persisted = [json.loads(l) for l in out_path.read_text().splitlines()] assert [r["mem_total_bytes"] for r in persisted] == [1, 2]