"""Plot a single episode's envelope. Renders a multi-panel chart from whatever telemetry the episode dir contains, with phase bands underneath each panel: panel 1 — host /proc CPU% (source 1, always) panel 2 — host /proc RSS (source 1, always) panel 3 — host /proc IO write (source 1, always) panel 4 — QMP block I/O ops (source 2, if telemetry-qmp.jsonl) panel 5 — perf IPC + miss-rate (source 3, if telemetry-perf.jsonl) panel 6 — bridge pcap pkts/s (source 4, if netflow.jsonl) panel 7 — guest agent CPU/load (source 5, if telemetry-guest.jsonl) Missing sources are silently skipped — a Tier-1 episode dir with only proc telemetry still gets the original 3-panel plot. A Tier-3+ run with all five sources gets the full stack on a shared time axis. Two modes: - Default: render to ``/envelope.png``. - ``--show``: serve interactively via matplotlib's WebAgg backend (zoom/pan/hover in the browser). On NixOS, run via ``tools/show_envelope.sh`` so libstdc++ is on LD_LIBRARY_PATH. """ from __future__ import annotations import argparse import json import os import sys from pathlib import Path PHASE_COLORS = { "clean": "#9bd09b", "armed": "#f0d27a", "infecting": "#ec9b58", "infected_running": "#d05757", "dormant": "#6f8ad6", "reverting": "#bbbbbb", } def _load_jsonl(path: Path) -> list[dict]: return [json.loads(l) for l in path.read_text().splitlines() if l.strip()] def main() -> int: parser = argparse.ArgumentParser(prog="plot_envelope") parser.add_argument("episode_dir", type=Path) parser.add_argument("--out", type=Path, default=None) parser.add_argument( "--show", action="store_true", help="open an interactive plot in your browser via WebAgg " "(localhost server)", ) parser.add_argument( "--port", type=int, default=8988, help="port for the WebAgg server (default 8988)", ) args = parser.parse_args() # Pick backend BEFORE importing pyplot. import matplotlib if args.show: matplotlib.use("WebAgg") # Bind to all interfaces so it works over the WG overlay too. matplotlib.rcParams["webagg.address"] = "0.0.0.0" matplotlib.rcParams["webagg.port"] = args.port matplotlib.rcParams["webagg.open_in_browser"] = True else: matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.patches import Patch d: Path = args.episode_dir if not d.exists(): print(f"no such directory: {d}", file=sys.stderr) return 2 proc_rows = _load_jsonl(d / "telemetry-proc.jsonl") labels = _load_jsonl(d / "labels.jsonl") if not proc_rows: print("no proc telemetry rows found", file=sys.stderr) return 2 t = [r["t_mono_ns"] / 1e9 for r in proc_rows] cpu_jiffies = [r["cpu_user_jiffies"] + r["cpu_sys_jiffies"] for r in proc_rows] rss_mib = [r["rss_bytes"] / (1024 * 1024) for r in proc_rows] io_w = [r["io_write_bytes"] or 0 for r in proc_rows] clk_tck = os.sysconf("SC_CLK_TCK") cpu_pct: list[float] = [0.0] io_kb_s: list[float] = [0.0] for i in range(1, len(proc_rows)): dt = t[i] - t[i - 1] if dt <= 0: cpu_pct.append(0.0) io_kb_s.append(0.0) continue d_jiffies = cpu_jiffies[i] - cpu_jiffies[i - 1] cpu_pct.append(100.0 * (d_jiffies / clk_tck) / dt) io_kb_s.append(((io_w[i] - io_w[i - 1]) / 1024.0) / dt) end_t = t[-1] if t else 0.0 spans: list[tuple[float, float, str]] = [] for i, lbl in enumerate(labels): start = lbl["t_mono_ns"] / 1e9 end = labels[i + 1]["t_mono_ns"] / 1e9 if i + 1 < len(labels) else end_t spans.append((start, end, lbl["phase"])) # Discover optional sources. qmp_rows = _load_jsonl(d / "telemetry-qmp.jsonl") if (d / "telemetry-qmp.jsonl").exists() else [] perf_rows = _load_jsonl(d / "telemetry-perf.jsonl") if (d / "telemetry-perf.jsonl").exists() else [] netflow_rows = _load_jsonl(d / "netflow.jsonl") if (d / "netflow.jsonl").exists() else [] guest_rows = _load_jsonl(d / "telemetry-guest.jsonl") if (d / "telemetry-guest.jsonl").exists() else [] panels: list[tuple[str, callable]] = [] # (ylabel, plot_fn(ax)) panels.append(("CPU % (proc)", lambda ax: ( ax.plot(t, cpu_pct, color="#222222", linewidth=1.0), ax.set_ylim(-3, 110), ))) panels.append(("RSS (MiB)", lambda ax: ax.plot(t, rss_mib, color="#222222", linewidth=1.0))) panels.append(("IO write (KiB/s)", lambda ax: ax.plot(t, io_kb_s, color="#222222", linewidth=1.0))) if qmp_rows: qt = [r["t_mono_ns"] / 1e9 for r in qmp_rows] # Sum block I/O ops across devices. wr_ops = [] rd_ops = [] for r in qmp_rows: bs = r.get("blockstats") or {} wr_ops.append(sum(d.get("wr_ops", 0) for d in bs.values())) rd_ops.append(sum(d.get("rd_ops", 0) for d in bs.values())) panels.append(("QMP block ops (cum)", lambda ax: ( ax.plot(qt, wr_ops, color="#cc4444", linewidth=1.0, label="wr_ops"), ax.plot(qt, rd_ops, color="#4488cc", linewidth=1.0, label="rd_ops"), ax.legend(loc="upper left", fontsize=8), ))) if perf_rows: pt = [r["t_mono_ns"] / 1e9 for r in perf_rows] ipc = [r.get("ipc") or 0 for r in perf_rows] miss = [r.get("cache_miss_rate") or 0 for r in perf_rows] panels.append(("perf IPC / miss-rate", lambda ax: ( ax.plot(pt, ipc, color="#222222", linewidth=1.0, label="IPC"), ax.plot(pt, miss, color="#cc4444", linewidth=1.0, label="cache miss rate"), ax.legend(loc="upper right", fontsize=8), ))) if netflow_rows: nt = [r["t_mono_ns"] / 1e9 for r in netflow_rows] pkts = [(r.get("pkts_in", 0) + r.get("pkts_out", 0)) for r in netflow_rows] synf = [r.get("syn_count", 0) for r in netflow_rows] panels.append(("bridge pkts / SYNs (per 100 ms)", lambda ax: ( ax.plot(nt, pkts, color="#222222", linewidth=1.0, label="pkts"), ax.plot(nt, synf, color="#cc4444", linewidth=1.0, label="syn"), ax.legend(loc="upper right", fontsize=8), ))) if guest_rows: gt = [r["t_mono_ns"] / 1e9 for r in guest_rows] load1 = [(r.get("load_1m_5m_15m") or [0])[0] for r in guest_rows] mem_used = [ ((r.get("mem_total_bytes") or 0) - (r.get("mem_available_bytes") or 0)) / (1024 * 1024) for r in guest_rows ] panels.append(("guest load1 / mem_used (MiB)", lambda ax: ( ax.plot(gt, load1, color="#222222", linewidth=1.0, label="load1"), ax.twinx().plot(gt, mem_used, color="#4488cc", linewidth=1.0, label="mem MiB"), ))) n = len(panels) fig, axes = plt.subplots(n, 1, figsize=(13, 2 + 1.6 * n), sharex=True) if n == 1: axes = [axes] for ax, (ylabel, plot_fn) in zip(axes, panels): plot_fn(ax) ax.set_ylabel(ylabel) ax.grid(alpha=0.25) axes[-1].set_xlabel("time (s)") for ax in axes: for start, end, phase in spans: ax.axvspan( start, end, color=PHASE_COLORS.get(phase, "#cccccc"), alpha=0.30, linewidth=0, ) legend_handles = [ Patch(facecolor=PHASE_COLORS[p], alpha=0.5, label=p) for p in PHASE_COLORS if any(s[2] == p for s in spans) ] axes[0].legend( handles=legend_handles, loc="upper right", ncols=len(legend_handles), fontsize=9, framealpha=0.85, ) fig.suptitle( f"Episode {d.name} — envelope ({len(proc_rows)} samples, {end_t:.1f}s)" ) fig.tight_layout() if args.show: print(f"WebAgg interactive plot starting on port {args.port}...") print(f"open: http://127.0.0.1:{args.port}/") print("(ctrl-C in this terminal to stop the server)") plt.show() return 0 out = args.out or (d / "envelope.png") fig.savefig(out, dpi=120) print(f"wrote {out}") return 0 if __name__ == "__main__": sys.exit(main())