"""Tests for the QMP collector against an in-process fake QMP server. The fake speaks just enough QMP to exercise: - the greeting + qmp_capabilities handshake - query-status - query-blockstats - query-stats target=vm - error responses - async events interleaved with command responses """ from __future__ import annotations import json import socket import tempfile import threading import time from pathlib import Path from typing import Any import pytest from collectors import qmp # --------------------------------------------------------------------------- # Fake QMP server # --------------------------------------------------------------------------- class FakeQMPServer(threading.Thread): """Single-connection fake. Each line received from the client is parsed as JSON; we look up ``execute`` in ``responses`` and emit the configured reply. Optionally interleaves an async event before the response.""" def __init__( self, socket_path: Path, *, responses: dict[str, Any] | None = None, emit_event_before: set[str] | None = None, ) -> None: super().__init__(daemon=True) self.socket_path = socket_path self.responses = responses or {} self.emit_event_before = emit_event_before or set() self.received: list[dict] = [] self._stop = threading.Event() self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock.bind(str(socket_path)) self._sock.listen(1) self._sock.settimeout(5.0) def run(self) -> None: try: conn, _ = self._sock.accept() except socket.timeout: return conn.settimeout(5.0) try: # Greeting conn.sendall(b'{"QMP": {"version": {"qemu": {"major":9,"minor":0,"micro":0}}, "capabilities": []}}\n') buf = b"" while not self._stop.is_set(): try: chunk = conn.recv(4096) except socket.timeout: if self._stop.is_set(): return continue if not chunk: return buf += chunk while b"\n" in buf: line, _, buf = buf.partition(b"\n") if not line.strip(): continue msg = json.loads(line) self.received.append(msg) cmd = msg.get("execute") if cmd == "qmp_capabilities": conn.sendall(b'{"return": {}}\n') continue if cmd in self.emit_event_before: conn.sendall(b'{"event": "STOP", "timestamp": {"seconds": 1, "microseconds": 0}}\n') if cmd in self.responses: resp = self.responses[cmd] conn.sendall((json.dumps(resp) + "\n").encode()) else: conn.sendall(b'{"error": {"class": "CommandNotFound", "desc": "unknown"}}\n') finally: conn.close() def shutdown(self) -> None: self._stop.set() try: self._sock.close() except OSError: pass @pytest.fixture def qmp_server(tmp_path: Path): sock_path = tmp_path / "qmp.sock" return sock_path # --------------------------------------------------------------------------- # Client tests # --------------------------------------------------------------------------- def test_connect_negotiates_capabilities(qmp_server: Path) -> None: server = FakeQMPServer(qmp_server) server.start() try: client = qmp.QMPClient(qmp_server) greeting = client.connect() assert "version" in greeting finally: client.close() server.shutdown() # Server saw exactly the qmp_capabilities call. assert any(m.get("execute") == "qmp_capabilities" for m in server.received) def test_execute_returns_payload(qmp_server: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "query-status": {"return": {"status": "running", "running": True}}, }, ) server.start() try: client = qmp.QMPClient(qmp_server) client.connect() out = client.execute("query-status") assert out == {"status": "running", "running": True} finally: client.close() server.shutdown() def test_execute_skips_async_events_before_response(qmp_server: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "query-status": {"return": {"status": "running", "running": True}}, }, emit_event_before={"query-status"}, ) server.start() try: client = qmp.QMPClient(qmp_server) client.connect() out = client.execute("query-status") assert out["running"] is True finally: client.close() server.shutdown() def test_execute_raises_on_qmp_error(qmp_server: Path) -> None: server = FakeQMPServer(qmp_server) # no responses → server sends error server.start() try: client = qmp.QMPClient(qmp_server) client.connect() with pytest.raises(qmp.QMPError): client.execute("totally-fake-command") finally: client.close() server.shutdown() # --------------------------------------------------------------------------- # Row builder tests # --------------------------------------------------------------------------- def test_collect_once_assembles_full_row(qmp_server: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "query-status": {"return": {"status": "running", "running": True}}, "query-blockstats": {"return": [{ "device": "virtio0", "stats": { "rd_operations": 12, "wr_operations": 4, "rd_bytes": 49152, "wr_bytes": 16384, "flush_operations": 1, }, }]}, "query-stats": {"return": [{"stats": [ {"name": "halt_exits", "value": 17000}, {"name": "io_exits", "value": 942}, {"name": "string-skipped", "value": "not-an-int"}, ]}]}, }, ) server.start() try: client = qmp.QMPClient(qmp_server) client.connect() row = qmp.collect_once(client, t_mono_origin_ns=time.monotonic_ns()) finally: client.close() server.shutdown() assert row["source"] == "host_qmp" assert row["available_in_deployment"] is False assert row["vm_running"] is True assert row["blockstats"]["virtio0"]["rd_bytes"] == 49152 assert row["blockstats"]["virtio0"]["flush_ops"] == 1 assert row["kvm_stats"]["halt_exits"] == 17000 assert "string-skipped" not in row["kvm_stats"] def test_collect_once_tolerates_missing_query_stats(qmp_server: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "query-status": {"return": {"status": "running", "running": True}}, "query-blockstats": {"return": []}, # query-stats deliberately absent → server returns CommandNotFound }, ) server.start() try: client = qmp.QMPClient(qmp_server) client.connect() row = qmp.collect_once(client, t_mono_origin_ns=time.monotonic_ns()) finally: client.close() server.shutdown() # Older qemu without query-stats: row still exists, kvm_stats absent. assert "kvm_stats" not in row assert row["vm_running"] is True assert row["blockstats"] == {} # --------------------------------------------------------------------------- # run_loop tests # --------------------------------------------------------------------------- def test_run_loop_writes_rows_and_stops_cleanly(qmp_server: Path, tmp_path: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "query-status": {"return": {"status": "running", "running": True}}, "query-blockstats": {"return": []}, "query-stats": {"error": {"class": "CommandNotFound", "desc": "n/a"}}, }, ) server.start() out_path = tmp_path / "telemetry-qmp.jsonl" stop = threading.Event() def stop_after(ms: int) -> None: time.sleep(ms / 1000.0) stop.set() threading.Thread(target=stop_after, args=(350,), daemon=True).start() rows = qmp.run_loop( socket_path=qmp_server, output_path=out_path, t_mono_origin_ns=time.monotonic_ns(), interval_ms=100, stop_event=stop, ) server.shutdown() assert rows >= 2, f"expected >=2 rows, got {rows}" lines = [json.loads(l) for l in out_path.read_text().splitlines()] assert len(lines) == rows for r in lines: assert r["source"] == "host_qmp" assert r["vm_running"] is True def test_savevm_and_loadvm_via_human_monitor(qmp_server: Path) -> None: server = FakeQMPServer( qmp_server, responses={ "human-monitor-command": {"return": ""}, }, ) server.start() try: client = qmp.QMPClient(qmp_server) client.connect() out_save = client.savevm("baseline") out_load = client.loadvm("baseline") assert out_save == "" assert out_load == "" finally: client.close() server.shutdown() # Both calls go out as human-monitor-command with the right cmdline. hmcs = [m for m in server.received if m.get("execute") == "human-monitor-command"] cmds = [m["arguments"]["command-line"] for m in hmcs] assert "savevm baseline" in cmds assert "loadvm baseline" in cmds def test_loadvm_surface_error(qmp_server: Path) -> None: server = FakeQMPServer(qmp_server) # no responses → error reply server.start() try: client = qmp.QMPClient(qmp_server) client.connect() with pytest.raises(qmp.QMPError): client.loadvm("does-not-exist") finally: client.close() server.shutdown() def test_run_loop_returns_zero_when_socket_missing(tmp_path: Path) -> None: # No server bound to the socket path. rows = qmp.run_loop( socket_path=tmp_path / "nonexistent.sock", output_path=tmp_path / "telemetry-qmp.jsonl", t_mono_origin_ns=time.monotonic_ns(), interval_ms=100, stop_event=threading.Event(), ) assert rows == 0