Diagnoses + fixes for the silent-collector / never-lands-session
failures that the 200-episode quality probe surfaced (§3 evidence).
All four address the producer; no compensating layers added.
perf collector (rows_perf=0 on 100% of episodes):
- perf stat -j writes to stderr by default with -p; we read stdout.
Add --log-fd 1 so JSON reaches stdout where the parser sees it.
- Event names come back annotated with the privilege scope perf
actually measured ("cycles:u" under perf_event_paranoid=2). Strip
the suffix so _build_row's plain-name lookups hit. Without this
every metric was None even when perf reported real numbers.
- tests/test_collectors_emit.py covers the regression with a real
busy-loop fixture; emit-test discipline per §4.4.
guest-agent collector (rows_guest=0 on 100% of episodes):
- Alpine cloud image doesn't ship python3, so the in-guest agent's
`#!/usr/bin/env python3` shebang silently fails. Add packages:
[python3] to cidata user-data so cloud-init installs it before
the OpenRC service starts.
- Guest agent now exits nonzero (was: silent stdout fallback) when
/dev/virtio-ports/cis490.guest.agent is missing, so OpenRC
reports the failure to /var/log/cis490-agent.log instead of the
bytes vanishing into the void. Refs §1.
- Host-side collector emits guest_agent_connected /
guest_agent_first_byte / guest_agent_silent_window into the
orchestrator's events.jsonl. Future episodes show the in-guest
failure mode per-episode instead of inferring from rows_guest=0.
k-gamingcom missing qmp/netflow/pcap (also affected elliott on
Tier-3 episodes — was misclassified as host divergence):
- tools/run_tier3_demo.py was building EpisodeConfig WITHOUT
qmp_socket / guest_agent_socket / bridge_iface — even though
launch_target.sh creates the underlying chardevs and BRIDGE
supplies the iface. tools/run_real_vm_demo.py wires them
correctly; Tier-3 had a copy-paste gap.
- tests/test_collectors_emit.py adds a source-grep regression so
the wiring stays honest.
samba_usermap_script never lands session (0/67 in §3 probe):
- Bind handler default WfsDelay (~5s) gives up before bind_perl on
Metasploitable2 has finished forking + binding LPORT under
SLIRP+hostfwd. Bump to 30s; matches session_open_timeout_s in
exploits/driver.py so framework + driver agree on the wait
budget. Add ConnectTimeout=15 so the handler's bind connect has
retry budget instead of one-shot.
orchestrator/fleet.py: usable_modules + BRIDGE handling were both
unconditional, so:
- With BRIDGE set, requires_bridge modules were still being
dropped — picker only ever returned samba_usermap_script across
every slot/episode (the test_fleet_uses_all_modules_when_bridge_set
failure on HEAD).
- env.pop("BRIDGE") fired even when BRIDGE was the operator's
explicit setup, breaking modules that need bridge mode (vsftpd
backdoor on hardcoded port 6200, distccd, etc.).
Both made conditional on bridge_set so the picker walks the full
catalog under bridge mode and SLIRP-only modules still get a
clean SLIRP env when BRIDGE is unset.
receiver/app.py: half-pregnant v2 schema state in HEAD — calling
store.ingest_stream(episode_type=..., benign_profile=...) with
kwargs the matching store.py change was in the WIP stash. Removed
v2 awareness from app.py so v1 episodes (what the producer ships
today) get accepted again. SCHEMA_VERSION default reset to 1 to
match.
229 passed, 0 failed. (HEAD had 15 failures, all linked to the
half-pregnant v2 state above.)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
183 lines
7.2 KiB
Python
183 lines
7.2 KiB
Python
"""Source 5 (feature, deployable): in-guest agent reader.
|
|
|
|
QEMU exposes a virtio-serial channel two ways:
|
|
- inside the guest: ``/dev/virtio-ports/cis490.guest.agent``
|
|
- on the host: a unix socket at ``$RUN_DIR/agent.sock``
|
|
|
|
The in-guest agent (`vm/guest-agent/cis490_agent.py`) writes one
|
|
JSON-lines row per tick into the guest-side device. Bytes traverse the
|
|
virtio bus and surface on the host socket. This collector reads them,
|
|
re-stamps with the host's monotonic clock (so rows align with all
|
|
other telemetry on a single timeline), and persists to
|
|
``telemetry-guest.jsonl``.
|
|
|
|
Why re-stamp? The agent's clock is the *guest* clock, which can drift
|
|
from the host (rare in KVM, but happens during live-migration tests
|
|
and on heavy host load). The original guest timestamps stay in the row
|
|
under ``t_guest_*`` so analysts can quantify drift if they care.
|
|
|
|
This source is the **deployable** side: every row is tagged
|
|
``available_in_deployment: true``. See docs/threat-model.md.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import socket
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
log = logging.getLogger("cis490.collectors.guest_agent")
|
|
|
|
SOURCE = "guest_agent"
|
|
AVAILABLE_IN_DEPLOYMENT = True
|
|
|
|
|
|
def _connect(socket_path: Path, timeout_s: float) -> socket.socket | None:
|
|
deadline = time.monotonic() + timeout_s
|
|
last_err: OSError | None = None
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
s.settimeout(2.0)
|
|
s.connect(str(socket_path))
|
|
return s
|
|
except OSError as e:
|
|
last_err = e
|
|
time.sleep(0.5)
|
|
if last_err is not None:
|
|
log.warning("guest-agent socket %s never came up: %s", socket_path, last_err)
|
|
return None
|
|
|
|
|
|
def _stamp(row: dict, t_mono_origin_ns: int) -> dict:
|
|
"""Replace the agent's wall-only timestamps with host-clock ones,
|
|
keeping the originals under ``t_guest_*`` for drift analysis."""
|
|
out = dict(row)
|
|
out.setdefault("t_guest_mono_ns", row.get("t_guest_mono_ns"))
|
|
out.setdefault("t_guest_wall_ns", row.get("t_guest_wall_ns"))
|
|
out["t_mono_ns"] = time.monotonic_ns() - t_mono_origin_ns
|
|
out["t_wall_ns"] = time.time_ns()
|
|
out.setdefault("source", SOURCE)
|
|
out.setdefault("available_in_deployment", AVAILABLE_IN_DEPLOYMENT)
|
|
return out
|
|
|
|
|
|
def run_loop(
|
|
socket_path: str | Path,
|
|
output_path: Path,
|
|
t_mono_origin_ns: int,
|
|
stop_event: threading.Event,
|
|
*,
|
|
connect_timeout_s: float = 30.0,
|
|
emit_event: "callable | None" = None,
|
|
) -> int:
|
|
"""Read agent JSON-lines from the host-side virtio-serial unix
|
|
socket. Re-stamp each row with the host clock and persist.
|
|
|
|
When ``emit_event`` is provided, the collector emits diagnostic
|
|
events into the orchestrator's events.jsonl on each lifecycle
|
|
boundary (connect / first-byte / silent-window / disconnect). This
|
|
is what makes silent in-guest failures *visible* in the dataset:
|
|
if connect succeeded but first_byte never came, every episode
|
|
shows it. Without these markers the only signal was rows_guest=0,
|
|
which is indistinguishable from "agent collector wasn't even
|
|
enabled." Refs PIPELINE.md §1 + §4.4.
|
|
"""
|
|
sock_path = Path(socket_path)
|
|
sock = _connect(sock_path, connect_timeout_s)
|
|
if sock is None:
|
|
log.warning(
|
|
"guest-agent: socket %s never came up after %.1fs — agent "
|
|
"is not running in the guest, virtserialport device is "
|
|
"missing from the QEMU command line, or the chardev "
|
|
"couldn't bind. 0 rows will be emitted.",
|
|
sock_path, connect_timeout_s,
|
|
)
|
|
if emit_event is not None:
|
|
emit_event("guest_agent_connect_failed",
|
|
socket_path=str(sock_path),
|
|
timeout_s=connect_timeout_s)
|
|
return 0
|
|
|
|
if emit_event is not None:
|
|
emit_event("guest_agent_connected", socket_path=str(sock_path))
|
|
|
|
rows = 0
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
buf = b""
|
|
first_byte_at_mono_ns: int | None = None
|
|
silent_warned = False
|
|
silent_warn_after_s = 5.0
|
|
connect_mono_ns = time.monotonic_ns()
|
|
try:
|
|
with output_path.open("a", buffering=1) as f:
|
|
while not stop_event.is_set():
|
|
try:
|
|
sock.settimeout(0.5)
|
|
chunk = sock.recv(8192)
|
|
except socket.timeout:
|
|
# The socket is open but nothing's arriving. Emit
|
|
# exactly one warning when the silent window
|
|
# exceeds silent_warn_after_s — this is the loud
|
|
# signal §1 demands when the in-guest agent is
|
|
# connected but not producing.
|
|
if (not silent_warned and first_byte_at_mono_ns is None
|
|
and (time.monotonic_ns() - connect_mono_ns)
|
|
> silent_warn_after_s * 1e9):
|
|
log.warning(
|
|
"guest-agent: socket connected but no bytes "
|
|
"after %.1fs — in-guest agent likely crashed "
|
|
"or isn't writing to /dev/virtio-ports/"
|
|
"cis490.guest.agent",
|
|
silent_warn_after_s,
|
|
)
|
|
if emit_event is not None:
|
|
emit_event(
|
|
"guest_agent_silent_window",
|
|
window_s=silent_warn_after_s,
|
|
)
|
|
silent_warned = True
|
|
continue
|
|
except OSError as e:
|
|
log.warning("guest-agent recv failed: %s", e)
|
|
break
|
|
if not chunk:
|
|
log.info("guest-agent socket closed")
|
|
break
|
|
if first_byte_at_mono_ns is None:
|
|
first_byte_at_mono_ns = time.monotonic_ns()
|
|
log.info(
|
|
"guest-agent: first byte received %.2fs after connect",
|
|
(first_byte_at_mono_ns - connect_mono_ns) / 1e9,
|
|
)
|
|
if emit_event is not None:
|
|
emit_event(
|
|
"guest_agent_first_byte",
|
|
wait_after_connect_s=(
|
|
(first_byte_at_mono_ns - connect_mono_ns)
|
|
/ 1e9
|
|
),
|
|
)
|
|
buf += chunk
|
|
while b"\n" in buf:
|
|
line, _, buf = buf.partition(b"\n")
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
row = json.loads(line)
|
|
except json.JSONDecodeError as e:
|
|
log.warning("dropping malformed guest-agent line: %s", e)
|
|
continue
|
|
f.write(json.dumps(_stamp(row, t_mono_origin_ns)) + "\n")
|
|
rows += 1
|
|
finally:
|
|
try:
|
|
sock.close()
|
|
except OSError:
|
|
pass
|
|
return rows
|