Wraps the gaps surfaced in the "what is not implemented" audit so the
fleet really is shippable end-to-end. Verified live on the Pi:
- cis490-shipper --ping → HTTP 200 through Caddy + mTLS via the
new wg-pki client CA leaf
- real episode dir → tar+zstd → PUT → HTTP 201 stored
- re-ship same bytes → 200 (idempotent)
- re-ship different bytes under same id → 409 (conflict)
Changes:
orchestrator/episode.py
- EpisodeConfig.revert_at_start / revert_at_end (Tier 0+ snapshot/
revert per docs/architecture.md). When set + qmp_socket present,
EpisodeRunner issues loadvm <snapshot_name> and emits
snapshot_revert / snapshot_revert_failed events on the same
monotonic clock as everything else.
collectors/qmp.py
- savevm() / loadvm() helpers using human-monitor-command, plus a
test against the fake QMP server.
exploits/workloads.py
- chunked_real_binary_upload() returns a ChunkedUpload plan: 8 KiB
base64 chunks (~6 KiB binary each) so msfrpc never sees a buffer-
busting payload. Includes a finalize step that sha256-verifies on
the guest before exec.
- real_binary_workload() now wraps the chunked plan for backwards
compat with single-shot callers.
exploits/driver.py
- Tier-4 dispatch walks the chunked plan in MSFExploitDriver:
each chunk is a separate session_shell_write; finalize verifies;
exec only runs on sha-ok. New events: real_binary_upload_begin,
real_binary_verify, real_binary_aborted.
etc/cis490-orchestrator.service
- Reads /etc/cis490/lab-host.env (FLEET_HOST_ID + optional BRIDGE).
- Grants AmbientCapabilities CAP_NET_RAW (tcpdump for source 4) +
CAP_SYS_ADMIN + CAP_PERFMON (perf for source 3) so collectors
work under hardening.
scripts/install-lab-host.sh
- Writes /etc/cis490/lab-host.env on first install with FLEET_HOST_ID
defaulting to `hostname -s`.
- Best-effort: fetches the Alpine baseline qcow2 (sha512-pinned) and
builds cidata.iso with the in-guest agent embedded; symlinks both
into /opt/cis490/vm/images/ so launchers find them.
scripts/fetch-alpine-baseline.sh
- Idempotent fetcher for the Alpine 3.21 cloud-init nocloud qcow2
matching the sha512 in docs/sources.md.
tools/plot_envelope.py
- Rebuilt to render whatever telemetry the episode dir contains:
proc → QMP block ops → perf IPC/miss-rate → bridge pkts/SYNs →
guest agent load/mem. Missing sources are silently skipped.
tools/index_reader.py
- cis490-index CLI: filter receiver's index.jsonl by host / sample
/ time range, sort, count-by group. Closest thing to a query
interface until we stand up Postgres/Timescale.
samples/README.md
- Rewritten to match the new manifest schema, the kind=real vs mimic
split, the per-(host, slot, ep) selection mechanic, and the
chunked-upload safety story.
Tests: 106 pass (was 102). New cases:
- test_qmp.py — savevm + loadvm (HMP wrapper + error path)
- test_tier4.py — chunked plan splitting, sha-pinned finalize,
end-to-end driver walks all chunks + verify + exec via the fake
msfrpc client
Closes the "what is not implemented" punch list.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
333 lines
10 KiB
Python
333 lines
10 KiB
Python
"""Tests for the QMP collector against an in-process fake QMP server.
|
|
|
|
The fake speaks just enough QMP to exercise:
|
|
- the greeting + qmp_capabilities handshake
|
|
- query-status
|
|
- query-blockstats
|
|
- query-stats target=vm
|
|
- error responses
|
|
- async events interleaved with command responses
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import socket
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from collectors import qmp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fake QMP server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class FakeQMPServer(threading.Thread):
|
|
"""Single-connection fake. Each line received from the client is
|
|
parsed as JSON; we look up ``execute`` in ``responses`` and emit
|
|
the configured reply. Optionally interleaves an async event before
|
|
the response."""
|
|
|
|
def __init__(
|
|
self,
|
|
socket_path: Path,
|
|
*,
|
|
responses: dict[str, Any] | None = None,
|
|
emit_event_before: set[str] | None = None,
|
|
) -> None:
|
|
super().__init__(daemon=True)
|
|
self.socket_path = socket_path
|
|
self.responses = responses or {}
|
|
self.emit_event_before = emit_event_before or set()
|
|
self.received: list[dict] = []
|
|
self._stop = threading.Event()
|
|
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self._sock.bind(str(socket_path))
|
|
self._sock.listen(1)
|
|
self._sock.settimeout(5.0)
|
|
|
|
def run(self) -> None:
|
|
try:
|
|
conn, _ = self._sock.accept()
|
|
except socket.timeout:
|
|
return
|
|
conn.settimeout(5.0)
|
|
try:
|
|
# Greeting
|
|
conn.sendall(b'{"QMP": {"version": {"qemu": {"major":9,"minor":0,"micro":0}}, "capabilities": []}}\n')
|
|
buf = b""
|
|
while not self._stop.is_set():
|
|
try:
|
|
chunk = conn.recv(4096)
|
|
except socket.timeout:
|
|
if self._stop.is_set():
|
|
return
|
|
continue
|
|
if not chunk:
|
|
return
|
|
buf += chunk
|
|
while b"\n" in buf:
|
|
line, _, buf = buf.partition(b"\n")
|
|
if not line.strip():
|
|
continue
|
|
msg = json.loads(line)
|
|
self.received.append(msg)
|
|
cmd = msg.get("execute")
|
|
if cmd == "qmp_capabilities":
|
|
conn.sendall(b'{"return": {}}\n')
|
|
continue
|
|
if cmd in self.emit_event_before:
|
|
conn.sendall(b'{"event": "STOP", "timestamp": {"seconds": 1, "microseconds": 0}}\n')
|
|
if cmd in self.responses:
|
|
resp = self.responses[cmd]
|
|
conn.sendall((json.dumps(resp) + "\n").encode())
|
|
else:
|
|
conn.sendall(b'{"error": {"class": "CommandNotFound", "desc": "unknown"}}\n')
|
|
finally:
|
|
conn.close()
|
|
|
|
def shutdown(self) -> None:
|
|
self._stop.set()
|
|
try:
|
|
self._sock.close()
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def qmp_server(tmp_path: Path):
|
|
sock_path = tmp_path / "qmp.sock"
|
|
return sock_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_connect_negotiates_capabilities(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(qmp_server)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
greeting = client.connect()
|
|
assert "version" in greeting
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
# Server saw exactly the qmp_capabilities call.
|
|
assert any(m.get("execute") == "qmp_capabilities" for m in server.received)
|
|
|
|
|
|
def test_execute_returns_payload(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"query-status": {"return": {"status": "running", "running": True}},
|
|
},
|
|
)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
out = client.execute("query-status")
|
|
assert out == {"status": "running", "running": True}
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
|
|
def test_execute_skips_async_events_before_response(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"query-status": {"return": {"status": "running", "running": True}},
|
|
},
|
|
emit_event_before={"query-status"},
|
|
)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
out = client.execute("query-status")
|
|
assert out["running"] is True
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
|
|
def test_execute_raises_on_qmp_error(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(qmp_server) # no responses → server sends error
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
with pytest.raises(qmp.QMPError):
|
|
client.execute("totally-fake-command")
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Row builder tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_collect_once_assembles_full_row(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"query-status": {"return": {"status": "running", "running": True}},
|
|
"query-blockstats": {"return": [{
|
|
"device": "virtio0",
|
|
"stats": {
|
|
"rd_operations": 12, "wr_operations": 4,
|
|
"rd_bytes": 49152, "wr_bytes": 16384,
|
|
"flush_operations": 1,
|
|
},
|
|
}]},
|
|
"query-stats": {"return": [{"stats": [
|
|
{"name": "halt_exits", "value": 17000},
|
|
{"name": "io_exits", "value": 942},
|
|
{"name": "string-skipped", "value": "not-an-int"},
|
|
]}]},
|
|
},
|
|
)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
row = qmp.collect_once(client, t_mono_origin_ns=time.monotonic_ns())
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
assert row["source"] == "host_qmp"
|
|
assert row["available_in_deployment"] is False
|
|
assert row["vm_running"] is True
|
|
assert row["blockstats"]["virtio0"]["rd_bytes"] == 49152
|
|
assert row["blockstats"]["virtio0"]["flush_ops"] == 1
|
|
assert row["kvm_stats"]["halt_exits"] == 17000
|
|
assert "string-skipped" not in row["kvm_stats"]
|
|
|
|
|
|
def test_collect_once_tolerates_missing_query_stats(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"query-status": {"return": {"status": "running", "running": True}},
|
|
"query-blockstats": {"return": []},
|
|
# query-stats deliberately absent → server returns CommandNotFound
|
|
},
|
|
)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
row = qmp.collect_once(client, t_mono_origin_ns=time.monotonic_ns())
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
# Older qemu without query-stats: row still exists, kvm_stats absent.
|
|
assert "kvm_stats" not in row
|
|
assert row["vm_running"] is True
|
|
assert row["blockstats"] == {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# run_loop tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_run_loop_writes_rows_and_stops_cleanly(qmp_server: Path, tmp_path: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"query-status": {"return": {"status": "running", "running": True}},
|
|
"query-blockstats": {"return": []},
|
|
"query-stats": {"error": {"class": "CommandNotFound", "desc": "n/a"}},
|
|
},
|
|
)
|
|
server.start()
|
|
out_path = tmp_path / "telemetry-qmp.jsonl"
|
|
stop = threading.Event()
|
|
|
|
def stop_after(ms: int) -> None:
|
|
time.sleep(ms / 1000.0)
|
|
stop.set()
|
|
|
|
threading.Thread(target=stop_after, args=(350,), daemon=True).start()
|
|
rows = qmp.run_loop(
|
|
socket_path=qmp_server,
|
|
output_path=out_path,
|
|
t_mono_origin_ns=time.monotonic_ns(),
|
|
interval_ms=100,
|
|
stop_event=stop,
|
|
)
|
|
server.shutdown()
|
|
|
|
assert rows >= 2, f"expected >=2 rows, got {rows}"
|
|
lines = [json.loads(l) for l in out_path.read_text().splitlines()]
|
|
assert len(lines) == rows
|
|
for r in lines:
|
|
assert r["source"] == "host_qmp"
|
|
assert r["vm_running"] is True
|
|
|
|
|
|
def test_savevm_and_loadvm_via_human_monitor(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(
|
|
qmp_server,
|
|
responses={
|
|
"human-monitor-command": {"return": ""},
|
|
},
|
|
)
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
out_save = client.savevm("baseline")
|
|
out_load = client.loadvm("baseline")
|
|
assert out_save == ""
|
|
assert out_load == ""
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
# Both calls go out as human-monitor-command with the right cmdline.
|
|
hmcs = [m for m in server.received if m.get("execute") == "human-monitor-command"]
|
|
cmds = [m["arguments"]["command-line"] for m in hmcs]
|
|
assert "savevm baseline" in cmds
|
|
assert "loadvm baseline" in cmds
|
|
|
|
|
|
def test_loadvm_surface_error(qmp_server: Path) -> None:
|
|
server = FakeQMPServer(qmp_server) # no responses → error reply
|
|
server.start()
|
|
try:
|
|
client = qmp.QMPClient(qmp_server)
|
|
client.connect()
|
|
with pytest.raises(qmp.QMPError):
|
|
client.loadvm("does-not-exist")
|
|
finally:
|
|
client.close()
|
|
server.shutdown()
|
|
|
|
|
|
def test_run_loop_returns_zero_when_socket_missing(tmp_path: Path) -> None:
|
|
# No server bound to the socket path.
|
|
rows = qmp.run_loop(
|
|
socket_path=tmp_path / "nonexistent.sock",
|
|
output_path=tmp_path / "telemetry-qmp.jsonl",
|
|
t_mono_origin_ns=time.monotonic_ns(),
|
|
interval_ms=100,
|
|
stop_event=threading.Event(),
|
|
)
|
|
assert rows == 0
|