CIS490/training/producers/profiles.py
Max 05bccac29f producers: phase-aware attack envelopes + tickable KNN metric/perf
profiles.py — non-shortcut fit:

  Old: pick one accepted episode per profile, emit its raw
       fraction-of-duration curve. Confounded by single-episode noise,
       phase-budget timing variance, and the cumulative-counter
       startup-spike artifact.

  New: aggregate up to N=100 accepted episodes per profile, slice each
       by labels.jsonl phase events, resample EACH PHASE to a fixed
       budget so the median across episodes captures the canonical
       per-phase shape rather than smearing peaks across the timeline.
       Save median + p25/p75 band to data/processed/attack_profiles_v1.parquet.

  Per-phase point budget (sums to 80):
       clean_lead 10, armed 5, infecting 10, infected_running 40,
       clean_tail 15. dormant (when present) folded into infected_running.

  Channel swap: io-walk uses proc.cpu_sys_jiffies, NOT
  proc.io_write_bytes. Host /proc on QEMU doesn't see virtio-blk
  writes via io.write_bytes (writes go through KVM's I/O path, not
  write() syscalls); cpu_sys_jiffies tracks kernel time which spikes
  during heavy I/O scheduling.

  Concrete result: cpu-saturate now shows the proper plateau-during-
  infected_running with peak at 100 j/s (was 30 j/s spike at idx 0
  then mostly zero); low-and-slow shows its distinctive low-amplitude
  profile (peak 21 vs cpu-saturate's 100); io-walk shows the
  rapid-rise-then-decay shape consistent with dd finishing mid-phase.

knn.py — sticky model_metric / model_perf:

  Stream subcommand gains --also-metric / --also-perf-latency-us
  flags. When set, each cycle publishes a model_metric event
  (tagged model=knn) for scene-8 (model bars) and a model_perf
  event for scene-12 (accuracy vs inference cost). Republishing on
  the cycle keeps reconnecting browsers populated without depending
  on the dashboard's not-yet-built sticky-event cache.

  Measured KNN inference latency on the 150k-trained classifier:
      single-window predict: 61.5 ms (sklearn brute-force at 230 D)
      per-window in batch=64: 3.4 ms (the production-realistic number)

  Streamer published: model_metric{knn, 0.762} +
                      model_perf{knn, latency_us=3410, accuracy=0.762}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 14:08:03 -05:00

485 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Emit `attack_profile` events for scene-6 (/proc signatures per profile).
Each event names a profile and ships an 80-point curve representing the
canonical /proc signature of that profile across the dataset.
The honest version (this file): aggregate the channel timeseries
across N (default 100) accepted episodes per profile, compute the
per-timestep MEDIAN, save median + p25/p75 band to disk, stream the
median. Episodes are aligned by *fraction-of-episode-duration* (each
episode resampled to the same 80-point grid, where index 0 is t=0
and index 79 is t=duration).
The shortcut (NOT this file): pick one accepted episode per profile
and render its raw curve. Confounded by single-episode noise,
phase-budget timing variance, and hardware-host heterogeneity.
Channels are /proc-derived per the slide title:
cpu-saturate proc.cpu_user_jiffies sustained 1-vCPU peg
io-walk proc.io_write_bytes fs walk + urandom writes
bursty-c2 proc.cpu_user_jiffies long idle + bursts
low-and-slow proc.cpu_user_jiffies low baseline + bumps
scan-and-dial proc.cpu_sys_jiffies many short network syscalls
shell-resident proc.cpu_sys_jiffies persistent socket + ticks
(Counter channels — channel_arrays converts to per-second rates.)
Two subcommands:
fit — aggregate N episodes per profile, save the median + band
to data/processed/attack_profiles_v1.parquet
stream — load the saved parquet, publish attack_profile events
on a tick (with --loop for long-running)
Same producer pattern as knn.py: fit-once, stream-from-disk.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from pathlib import Path
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from training._episode_io import open_episode
from training._features import channel_arrays, episode_t0_wall_ns
from training.producers._publish import (
PublishFn, http_publisher, null_publisher,
)
log = logging.getLogger("cis490.dashboard.producers.profiles")
# Slide-title is "/proc signature per profile", so we use proc-side
# channels exclusively. After channel_arrays() these become per-second
# rates (counter-diff).
#
# io-walk uses proc.cpu_sys_jiffies, NOT proc.io_write_bytes, because
# the host /proc on the QEMU process doesn't see virtio-blk writes
# via the io.write_bytes counter (writes go through KVM's I/O path,
# not write() syscalls). cpu_sys_jiffies tracks kernel time, which
# does spike during heavy host-side I/O scheduling.
PROFILE_TO_CHANNEL: dict[str, tuple[str, str]] = {
"cpu-saturate": ("proc.cpu_user_jiffies", "sustained 1-vCPU peg (XMRig)"),
"io-walk": ("proc.cpu_sys_jiffies", "fs traversal + urandom writes"),
"bursty-c2": ("proc.cpu_user_jiffies", "long idle + 3-packet egress bursts"),
"low-and-slow": ("proc.cpu_user_jiffies", "minimal CPU + periodic memory churn"),
"scan-and-dial": ("proc.cpu_sys_jiffies", "SYN-style probes + dial-home"),
"shell-resident": ("proc.cpu_sys_jiffies", "persistent TCP socket + command ticks"),
}
# Phase-aware alignment: per-phase points in the concatenated 80-point
# curve. Sums to 80. The canonical schedule is
# clean(10s) → armed(2s) → infecting(3s) → infected_running(25s) → clean(5s)
# but the per-phase point allocation is shape-driven not time-driven —
# we want infected_running (the operationally distinctive phase) to
# dominate the visual.
PHASE_POINT_BUDGET = {
"clean": 10, # opening clean; the baseline
"armed": 5,
"infecting": 10,
"infected_running": 40, # the signature phase
"dormant": 10, # if the episode walks through dormant; we
# collapse it into infected_running budget
# if not present
"_tail_clean": 15, # closing clean (after infected_running);
# kept separate from leading clean
}
# Total: 10 + 5 + 10 + 40 + 15 = 80 (dormant is only added if present)
# ─────────────────────────────────────────────────────────────────────
# Fit
# ─────────────────────────────────────────────────────────────────────
def _resample_to_grid(t: np.ndarray, v: np.ndarray, n: int,
*, t_start: float | None = None,
t_end: float | None = None) -> np.ndarray | None:
"""Resample (t, v) to n points across [t_start, t_end] (or [t.min,
t.max] if not given). Linear interp on finite samples; NaN-padded
if the slice has too few finite points to interpolate.
The first sample of a counter-diff'd rate is often a startup-artifact
spike (cumulative-since-process-start divided by first sample dt).
We don't drop it here — leaves that to the caller — but the slice-
by-time approach used by phase-aware alignment naturally avoids it
when t_start > 0.
"""
if len(t) < 2:
return None
finite = np.isfinite(v)
if int(finite.sum()) < 2:
return None
t_f = t[finite]; v_f = v[finite]
lo = t_start if t_start is not None else float(t_f.min())
hi = t_end if t_end is not None else float(t_f.max())
if hi - lo < 1e-9:
return None
grid = np.linspace(lo, hi, n)
return np.interp(grid, t_f, v_f, left=np.nan, right=np.nan)
def _phase_segments(epi) -> list[tuple[str, float, float]]:
"""Return [(phase, t_start_s, t_end_s)] sequence for one episode.
Uses labels.jsonl events. The last segment ends at the episode's
duration (from meta.json). Times are episode-relative (t_wall_ns
- first label's t_wall_ns / 1e9). This is the same clock the
feature extractor uses.
"""
if not epi.labels:
return []
t0 = int(epi.labels[0]["t_wall_ns"])
duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0
out: list[tuple[str, float, float]] = []
labels = epi.labels
for i, L in enumerate(labels):
ph = L.get("phase")
if ph is None:
continue
t_s = (L["t_wall_ns"] - t0) / 1e9
if i + 1 < len(labels):
t_e = (labels[i + 1]["t_wall_ns"] - t0) / 1e9
else:
t_e = duration if duration > t_s else t_s + 1.0
if t_e > t_s:
out.append((ph, t_s, t_e))
return out
def _phase_aligned_curve(epi, channel_name: str, n_points: int = 80
) -> np.ndarray | None:
"""Build a fixed-length per-phase-aligned curve for one episode.
The output is ``n_points`` long with each phase taking the budget
in PHASE_POINT_BUDGET. Layout (canonical 5-phase walk):
idx 0..9 clean (leading)
idx 10..14 armed
idx 15..24 infecting
idx 25..64 infected_running (the distinctive phase)
idx 65..79 clean (trailing)
If the episode has a `dormant` phase (longer walks), it gets
folded into the infected_running budget — we resample
`infected_running` `dormant` together since both are post-armed
activity and the median across episodes loses precision otherwise.
Returns None if the episode's labels don't include the canonical walk.
"""
from training._features import channel_arrays, episode_t0_wall_ns
segs = _phase_segments(epi)
if len(segs) < 4:
return None
# Find the first occurrence of each canonical phase
phase_starts: dict[str, float] = {}
phase_ends: dict[str, float] = {}
for ph, ts, te in segs:
if ph not in phase_starts:
phase_starts[ph] = ts
# Last end-time we see for this phase (may be reset on re-entry)
phase_ends[ph] = te
# Fold dormant into infected_running's slice if present
if "infected_running" in phase_starts and "dormant" in phase_starts:
# Extend infected_running's end through any dormant occurrences
last_relevant = max(
phase_ends.get("infected_running", 0.0),
phase_ends.get("dormant", 0.0),
)
phase_ends["infected_running"] = last_relevant
# We want: leading_clean, armed, infecting, infected_running, trailing_clean
# Find where leading_clean ends (= first armed) and where trailing_clean
# starts (= the LAST clean's start, after infected_running/dormant).
if "armed" not in phase_starts or "infected_running" not in phase_starts:
return None
leading_clean_start = 0.0
leading_clean_end = phase_starts["armed"]
armed_end = phase_starts.get("infecting", phase_starts["armed"] + 0.1)
infecting_end = phase_starts["infected_running"]
inf_run_end = phase_ends["infected_running"]
duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0
# Trailing clean = anything after infected_running's end
trailing_clean_start = inf_run_end
trailing_clean_end = duration if duration > inf_run_end else inf_run_end + 0.1
arrs = channel_arrays(epi, episode_t0_wall_ns(epi))
t, v = arrs.get(channel_name, (np.zeros(0), np.zeros(0)))
if len(t) < 2:
return None
# Per-phase resample then concat. NaN values in the segment are
# interpolated past — np.interp's left/right=NaN don't apply here
# because we slice by time within the channel's data range.
pieces: list[np.ndarray] = []
for (label, t_s, t_e, n) in [
("clean_lead", leading_clean_start, leading_clean_end, PHASE_POINT_BUDGET["clean"]),
("armed", leading_clean_end, armed_end, PHASE_POINT_BUDGET["armed"]),
("infecting", armed_end, infecting_end, PHASE_POINT_BUDGET["infecting"]),
("infected_running", infecting_end, inf_run_end, PHASE_POINT_BUDGET["infected_running"]),
("clean_tail", trailing_clean_start, trailing_clean_end, PHASE_POINT_BUDGET["_tail_clean"]),
]:
if t_e <= t_s:
pieces.append(np.full(n, np.nan, dtype=np.float64))
continue
seg = _resample_to_grid(t, v, n=n, t_start=t_s, t_end=t_e)
if seg is None:
pieces.append(np.full(n, np.nan, dtype=np.float64))
else:
pieces.append(seg)
out = np.concatenate(pieces, axis=0)
if out.shape[0] != n_points:
# Defensive: pad/truncate to expected length
if out.shape[0] < n_points:
out = np.concatenate(
[out, np.full(n_points - out.shape[0], np.nan)]
)
else:
out = out[:n_points]
return out
def aggregate_envelope(
*, validation_path: Path, store_root: Path,
profile: str, channel_name: str,
n_episodes_target: int = 100, n_points: int = 80,
seed: int = 0,
) -> dict | None:
"""Aggregate the channel timeseries across up to ``n_episodes_target``
accepted episodes of ``profile``. Returns a dict with the canonical
median curve + p25/p75 band, plus diagnostics; ``None`` if no
episodes were usable.
Aggregation method:
1. Random-sample up to N accepted episodes of this profile
2. For each, decompress the tarball, extract channel via
channel_arrays() (counter-diff applied)
3. Resample to n_points by fraction-of-episode-duration
4. Stack → (n_used, n_points)
5. Per-timestep median + p25/p75 across episodes
6. Normalize median to [0, 1] for the dashboard renderer
"""
rng = np.random.default_rng(seed)
val = pq.read_table(
validation_path,
columns=["episode_id", "host_id", "profile", "status"],
).to_pylist()
candidates = [r for r in val
if r["status"] == "accepted" and r["profile"] == profile]
if not candidates:
log.warning("no accepted episodes for profile %s", profile)
return None
if len(candidates) > n_episodes_target:
idx = rng.choice(len(candidates), n_episodes_target, replace=False)
candidates = [candidates[i] for i in idx]
curves: list[np.ndarray] = []
n_attempted = 0
n_failed = 0
for r in candidates:
n_attempted += 1
path = Path(store_root) / r["host_id"] / f"{r['episode_id']}.tar.zst"
if not path.exists():
n_failed += 1
continue
try:
epi = open_episode(path, host_id=r["host_id"])
except Exception as e:
log.debug("open %s failed: %s", path, e)
n_failed += 1
continue
if not epi.labels:
n_failed += 1
continue
# Phase-aware alignment: each episode's curve uses its phase
# boundaries from labels.jsonl, resampled per phase to a fixed
# budget so the median across episodes captures the canonical
# per-phase shape (vs the fraction-of-duration shortcut, which
# washes out plateaus when phase timing varies).
curve = _phase_aligned_curve(epi, channel_name, n_points=n_points)
if curve is None:
n_failed += 1
continue
curves.append(curve)
if not curves:
log.warning("no usable episodes for profile %s "
"(attempted=%d, failed=%d)",
profile, n_attempted, n_failed)
return None
arr = np.asarray(curves, dtype=np.float64) # (n_used, n_points)
# nanmedian/percentile because individual phase segments may have
# produced NaN where the channel had no data in that phase.
median = np.nanmedian(arr, axis=0)
median = np.where(np.isnan(median), 0.0, median)
p25 = np.nan_to_num(np.nanpercentile(arr, 25, axis=0), nan=0.0)
p75 = np.nan_to_num(np.nanpercentile(arr, 75, axis=0), nan=0.0)
# Normalize the median for the renderer; keep raw values so the band
# can be plotted in the same scale on a future widget.
m_lo, m_hi = float(np.nanmin(median)), float(np.nanmax(median))
if m_hi - m_lo < 1e-9:
median_norm = np.zeros_like(median)
else:
median_norm = (median - m_lo) / (m_hi - m_lo)
log.info("%-15s channel=%-26s n_used=%d/%d range=[%.3g, %.3g]",
profile, channel_name, len(curves), n_attempted, m_lo, m_hi)
return {
"profile": profile,
"channel": channel_name,
"n_episodes_used": len(curves),
"n_episodes_attempted": n_attempted,
"median": median_norm.astype(np.float32).tolist(),
"median_raw": median.astype(np.float32).tolist(),
"p25_raw": p25.astype(np.float32).tolist(),
"p75_raw": p75.astype(np.float32).tolist(),
"median_min_raw": m_lo,
"median_max_raw": m_hi,
}
def _save_fit(rows: list[dict], path: Path) -> None:
"""Save aggregation result as a parquet (one row per profile)."""
if not rows:
raise ValueError("nothing to save")
n_points = len(rows[0]["median"])
schema = pa.schema([
("profile", pa.string()),
("channel", pa.string()),
("n_episodes_used", pa.int32()),
("n_episodes_attempted", pa.int32()),
("median", pa.list_(pa.float32(), n_points)),
("median_raw", pa.list_(pa.float32(), n_points)),
("p25_raw", pa.list_(pa.float32(), n_points)),
("p75_raw", pa.list_(pa.float32(), n_points)),
("median_min_raw", pa.float64()),
("median_max_raw", pa.float64()),
])
cols = {n: [r[n] for r in rows] for n in schema.names}
tbl = pa.table(cols, schema=schema)
path.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(tbl, path, compression="zstd")
async def _fit(args) -> int:
rows: list[dict] = []
for prof, (ch_name, _) in PROFILE_TO_CHANNEL.items():
res = aggregate_envelope(
validation_path=args.validation,
store_root=args.store,
profile=prof, channel_name=ch_name,
n_episodes_target=args.n_episodes_per_profile,
n_points=args.n_points,
seed=args.seed,
)
if res is not None:
rows.append(res)
if not rows:
log.error("no profiles had usable episodes")
return 1
_save_fit(rows, args.fit_out)
log.info("wrote %s (%d profile rows)", args.fit_out, len(rows))
print(json.dumps({
"saved_to": str(args.fit_out),
"n_profiles": len(rows),
"profiles": [r["profile"] for r in rows],
}, indent=2))
return 0
# ─────────────────────────────────────────────────────────────────────
# Stream
# ─────────────────────────────────────────────────────────────────────
async def _stream(args) -> int:
if not args.load_fit.exists():
log.error("fit parquet not found: %s — run `profiles fit --fit-out` first",
args.load_fit)
return 1
tbl = pq.read_table(args.load_fit).to_pylist()
log.info("loaded %d profile aggregates from %s", len(tbl), args.load_fit)
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
while True:
for r in tbl:
shape_text = PROFILE_TO_CHANNEL.get(r["profile"], (None, ""))[1]
await publisher({
"type": "attack_profile",
"name": r["profile"],
"shape": shape_text,
"curve": list(r["median"]),
})
log.info("cycle complete: %d profiles", len(tbl))
if not args.loop:
return 0
await asyncio.sleep(args.cycle_pause_s)
# ─────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pf = sub.add_parser("fit",
help="aggregate N episodes per profile and save "
"the canonical envelope to disk")
pf.add_argument("--validation", required=True, type=Path)
pf.add_argument("--store", required=True, type=Path,
help="receiver episode store root, e.g. "
"/var/lib/cis490/episodes")
pf.add_argument("--fit-out", type=Path,
default=Path("data/processed/attack_profiles_v1.parquet"))
pf.add_argument("--n-episodes-per-profile", type=int, default=100)
pf.add_argument("--n-points", type=int, default=80,
help="resampled curve length (default 80, what "
"the dashboard's profile thumbnail expects)")
pf.add_argument("--seed", type=int, default=0)
pf.add_argument("--log-level", default="INFO")
pf.set_defaults(func=_fit)
ps = sub.add_parser("stream",
help="publish attack_profile events from a saved "
"fit parquet")
ps.add_argument("--load-fit", type=Path,
default=Path("data/processed/attack_profiles_v1.parquet"))
ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
ps.add_argument("--loop", action="store_true",
help="cycle indefinitely so reconnecting browsers "
"stay populated")
ps.add_argument("--cycle-pause-s", type=float, default=20.0,
help="pause between cycles when --loop is set")
ps.add_argument("--dry-run", action="store_true")
ps.add_argument("--log-level", default="INFO")
ps.set_defaults(func=_stream)
args = p.parse_args()
logging.basicConfig(level=args.log_level,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
return asyncio.run(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())