CIS490/collectors/qmp.py
max 1b6c7b2f4a Collectors 2/4/5 + fleet runner + sample manifest + Tier-3 setup scripts
This is the chunk that makes "real data" actually flow on multiple
hosts in parallel. End-to-end pipe was up at 613c6fa / 2579683; now
the lab-host side has the diversity + concurrency it needs.

Collectors landed:
  collectors/qmp.py          — source 2 (oracle). Tiny synchronous QMP
                               client + row builder + run loop. Tolerates
                               older qemu without query-stats.
  collectors/guest_agent.py  — source 5 (deployable). Reads the
                               virtio-serial host-side socket, parses
                               agent JSON-lines, re-stamps to the host
                               monotonic clock, persists.
  collectors/pcap.py         — source 4 (deployable). tcpdump capture
                               + pure-Python pcap reader + 100 ms
                               netflow.jsonl bucketizer. Decodes
                               Ethernet/IPv4/TCP/UDP enough for the
                               schema in docs/data-model.md.

In-guest agent:
  vm/guest-agent/cis490_agent.py — stdlib-only Python agent. Reads
    /proc/{stat,meminfo,loadavg,net/dev,net/tcp*}, top-N RSS procs,
    thermal. Writes JSON-lines to /dev/virtio-ports/cis490.guest.agent.
  tools/build_cidata.py — embeds the agent + an OpenRC service into
    user-data so first boot of the Alpine cidata image auto-starts it.

Launchers:
  vm/launch_demo.sh / launch_target.sh — second virtio-serial port for
    the agent socket; SLOT env support so multiple VMs run without
    socket / port collisions; PORT_BASE on launch_target so multiple
    target VMs hostfwd different host ports.
  vm/setup_bridge.sh — creates host-only br-malware (10.200.0.1/24,
    no NAT). Idempotent.

Fleet:
  orchestrator/fleet.py — capacity detector (cores / RAM / load
    headroom) + concurrent-slot runner. Per-slot ENV selects the
    sample. FleetCapacity dataclass round-trips into meta.json so
    "this episode ran with 6 concurrent VMs" is auditable post-hoc.
  tools/run_fleet.py — CLI: --capacity report; --waves N runs N
    waves of (max_concurrent) episodes each, every slot with a
    different sample.
  etc/cis490-orchestrator.service — now drives the fleet runner with
    Restart=always so each invocation runs one wave and respawns,
    giving a continuous stream.

Samples:
  samples/manifest.toml — six profiles spanning the five major
    behaviour shapes. Each entry is real OR mimic (sha256 distinguishes).
  samples/manifest.py — strict TOML loader (rejects dups, unknown
    categories) + deterministic select(host_id, slot, episode_index)
    so different hosts on the network walk the catalog in different
    orders without any coordinator.

EpisodeRunner:
  orchestrator/episode.py — optional qmp_socket + guest_agent_socket
    fields on EpisodeConfig; when set, additional collector threads
    run alongside proc_qemu. EpisodeResult now carries rows_qmp +
    rows_guest counters.

Tier-3 setup automation:
  scripts/install-msfrpcd.sh — installs metasploit-framework where
    the package manager has it, generates a strong password into
    /etc/cis490/msfrpc.env, drops a hardened systemd unit bound to
    127.0.0.1:55553. After this, run_tier3_demo.py works zero-touch
    once MSFRPC_PASSWORD is sourced.
  scripts/fetch-metasploitable2.sh — accepts IMAGE_URL + IMAGE_SHA256
    from the operator (Rapid7 download is registration-walled), pulls,
    verifies, converts vmdk → qcow2, lands at vm/images/.

Tests: 82 pass (was 51). New suites:
  tests/test_qmp.py       — fake QMP server, capability handshake,
                            blockstats, async-event interleaving,
                            5-failure backoff
  tests/test_guest_agent.py — fake virtio socket, JSON-lines read +
                              re-stamp, malformed-line tolerance
  tests/test_pcap.py      — synthetic pcap with TCP/UDP/ARP frames,
                            bucketize correctness across windows
  tests/test_fleet.py     — capacity math (8-core idle / low-RAM /
                            high-load / Pi5 / 1-core box), manifest
                            selection determinism + diversity

What's queued for the next commit (already discussed in convo):
  - MSFExploitDriver v2: map sample.profile → distinct in-session
    workload so Tier-3 episodes don't all produce the same yes-loop
    envelope. Critical for ML to learn varied malware shapes.
  - Real-sample fetch from MalwareBazaar by sha256.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 00:02:27 -05:00

244 lines
8.5 KiB
Python

"""Source 2 (oracle): QEMU QMP sampler.
Connects to the QEMU monitor protocol socket exposed by the launcher
($RUN_DIR/qmp.sock) and periodically queries the hypervisor for
per-VM stats that don't show up in /proc/<qemu_pid>:
- per-disk block I/O (rd_bytes, wr_bytes, rd_ops, wr_ops)
- VM run state (running / paused / shutdown)
- per-netdev tx/rx counters (when available)
- KVM stat counters (when available; introspection differs by qemu
version, so anything we can't read is skipped silently)
This source is **oracle-only** — it does not exist on a deployed
device. Every row carries ``available_in_deployment: false``.
Wire format: QMP is line-delimited JSON. The handshake is fixed:
server → {"QMP": {capabilities: [...], version: ...}}
client → {"execute": "qmp_capabilities"}
server → {"return": {}}
(client may now issue commands)
We use a dedicated synchronous client because QMP is request/response
and we don't need pipelining; one query batch per tick keeps the
on-disk schema simple.
"""
from __future__ import annotations
import json
import logging
import socket
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
log = logging.getLogger("cis490.collectors.qmp")
SOURCE = "host_qmp"
AVAILABLE_IN_DEPLOYMENT = False
class QMPError(RuntimeError):
pass
@dataclass
class _SockReader:
sock: socket.socket
buf: bytes = b""
def read_line(self, timeout_s: float = 5.0) -> str:
deadline = time.monotonic() + timeout_s
while b"\n" not in self.buf:
self.sock.settimeout(max(0.1, deadline - time.monotonic()))
try:
chunk = self.sock.recv(8192)
except socket.timeout as e:
raise QMPError(f"QMP read timed out: {e}") from e
if not chunk:
raise QMPError("QMP connection closed by peer")
self.buf += chunk
line, _, rest = self.buf.partition(b"\n")
self.buf = rest
return line.decode("utf-8", errors="replace")
class QMPClient:
"""Tiny synchronous QMP client over a unix socket."""
def __init__(self, socket_path: str | Path) -> None:
self.path = str(socket_path)
self._sock: socket.socket | None = None
self._reader: _SockReader | None = None
def connect(self, timeout_s: float = 5.0) -> dict[str, Any]:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(timeout_s)
s.connect(self.path)
self._sock = s
self._reader = _SockReader(s)
# Read greeting.
greeting = json.loads(self._reader.read_line(timeout_s=timeout_s))
if "QMP" not in greeting:
raise QMPError(f"unexpected QMP greeting: {greeting!r}")
# Negotiate capabilities (no flags requested).
self.execute("qmp_capabilities")
return greeting["QMP"]
def execute(self, command: str, **arguments: Any) -> Any:
if self._sock is None or self._reader is None:
raise QMPError("not connected")
msg: dict[str, Any] = {"execute": command}
if arguments:
msg["arguments"] = arguments
body = (json.dumps(msg) + "\n").encode("utf-8")
self._sock.sendall(body)
# QMP can interleave async events with the response — drain
# until we see the matching {"return": ...} or {"error": ...}.
for _ in range(64): # bounded to avoid an infinite loop on bugs
line = self._reader.read_line()
if not line.strip():
continue
resp = json.loads(line)
if "return" in resp:
return resp["return"]
if "error" in resp:
raise QMPError(f"{command}: {resp['error']}")
# Otherwise it's an async event; ignore and keep reading.
raise QMPError(f"{command}: too many async events without a response")
def close(self) -> None:
if self._sock is not None:
try:
self._sock.close()
except OSError:
pass
self._sock = None
self._reader = None
# ---- row builders ----------------------------------------------------------
def _flatten_blockstats(blockstats: list[dict] | None) -> dict[str, dict[str, int]]:
"""Compact ``query-blockstats`` to ``{device: {rd_ops, wr_ops, ...}}``."""
out: dict[str, dict[str, int]] = {}
for entry in blockstats or []:
name = entry.get("device") or entry.get("qdev") or "unknown"
s = entry.get("stats") or {}
out[name] = {
"rd_ops": int(s.get("rd_operations", 0)),
"wr_ops": int(s.get("wr_operations", 0)),
"rd_bytes": int(s.get("rd_bytes", 0)),
"wr_bytes": int(s.get("wr_bytes", 0)),
"flush_ops": int(s.get("flush_operations", 0)),
}
return out
def collect_once(client: QMPClient, t_mono_origin_ns: int) -> dict[str, Any]:
row: dict[str, Any] = {
"t_mono_ns": time.monotonic_ns() - t_mono_origin_ns,
"t_wall_ns": time.time_ns(),
"source": SOURCE,
"available_in_deployment": AVAILABLE_IN_DEPLOYMENT,
}
# query-status is dirt cheap and tells us whether the guest is
# paused (rare) or running.
try:
status = client.execute("query-status")
row["vm_status"] = status.get("status")
row["vm_running"] = bool(status.get("running"))
except QMPError as e:
log.debug("query-status failed: %s", e)
try:
bs = client.execute("query-blockstats")
row["blockstats"] = _flatten_blockstats(bs)
except QMPError as e:
log.debug("query-blockstats failed: %s", e)
# query-stats is QEMU 7.1+ and the schema varies across versions.
# We only ask for KVM stats and tolerate any subset of fields.
try:
stats = client.execute("query-stats", target="vm")
row["kvm_stats"] = _summarize_query_stats(stats)
except QMPError as e:
log.debug("query-stats not supported: %s", e)
return row
def _summarize_query_stats(stats_resp: list[dict] | dict) -> dict[str, int]:
"""Reduce ``query-stats`` to a flat name→value map of integer
counters. The full payload is verbose and version-specific; we only
ever want individual scalar counters downstream."""
flat: dict[str, int] = {}
items = stats_resp if isinstance(stats_resp, list) else [stats_resp]
for entry in items:
for s in entry.get("stats", []) or []:
name = s.get("name")
value = s.get("value")
if isinstance(name, str) and isinstance(value, int):
flat[name] = value
return flat
# ---- run loop --------------------------------------------------------------
def run_loop(
socket_path: str | Path,
output_path: Path,
t_mono_origin_ns: int,
interval_ms: int,
stop_event: threading.Event,
) -> int:
"""Connect to ``socket_path`` and sample at ``interval_ms`` until
``stop_event``. Returns the number of rows written.
A single missed sample (transient QMP error) is logged and skipped;
repeated failures terminate the loop so the episode finishes cleanly
rather than hanging on a dead hypervisor."""
interval_ns = interval_ms * 1_000_000
client = QMPClient(socket_path)
try:
client.connect(timeout_s=5.0)
except (OSError, QMPError) as e:
log.warning("QMP connect to %s failed: %s — collector exits cleanly", socket_path, e)
return 0
rows = 0
consecutive_failures = 0
next_tick = time.monotonic_ns()
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
with output_path.open("a", buffering=1) as f:
while not stop_event.is_set():
try:
row = collect_once(client, t_mono_origin_ns)
f.write(json.dumps(row) + "\n")
rows += 1
consecutive_failures = 0
except (QMPError, OSError) as e:
consecutive_failures += 1
log.warning("QMP sample %d failed: %s", rows, e)
if consecutive_failures >= 5:
log.warning("5 consecutive QMP failures; bailing")
break
next_tick += interval_ns
sleep_ns = next_tick - time.monotonic_ns()
if sleep_ns > 0:
stop_event.wait(sleep_ns / 1_000_000_000)
else:
next_tick = time.monotonic_ns()
finally:
client.close()
return rows