profiles.py — non-shortcut fit:
Old: pick one accepted episode per profile, emit its raw
fraction-of-duration curve. Confounded by single-episode noise,
phase-budget timing variance, and the cumulative-counter
startup-spike artifact.
New: aggregate up to N=100 accepted episodes per profile, slice each
by labels.jsonl phase events, resample EACH PHASE to a fixed
budget so the median across episodes captures the canonical
per-phase shape rather than smearing peaks across the timeline.
Save median + p25/p75 band to data/processed/attack_profiles_v1.parquet.
Per-phase point budget (sums to 80):
clean_lead 10, armed 5, infecting 10, infected_running 40,
clean_tail 15. dormant (when present) folded into infected_running.
Channel swap: io-walk uses proc.cpu_sys_jiffies, NOT
proc.io_write_bytes. Host /proc on QEMU doesn't see virtio-blk
writes via io.write_bytes (writes go through KVM's I/O path, not
write() syscalls); cpu_sys_jiffies tracks kernel time which spikes
during heavy I/O scheduling.
Concrete result: cpu-saturate now shows the proper plateau-during-
infected_running with peak at 100 j/s (was 30 j/s spike at idx 0
then mostly zero); low-and-slow shows its distinctive low-amplitude
profile (peak 21 vs cpu-saturate's 100); io-walk shows the
rapid-rise-then-decay shape consistent with dd finishing mid-phase.
knn.py — sticky model_metric / model_perf:
Stream subcommand gains --also-metric / --also-perf-latency-us
flags. When set, each cycle publishes a model_metric event
(tagged model=knn) for scene-8 (model bars) and a model_perf
event for scene-12 (accuracy vs inference cost). Republishing on
the cycle keeps reconnecting browsers populated without depending
on the dashboard's not-yet-built sticky-event cache.
Measured KNN inference latency on the 150k-trained classifier:
single-window predict: 61.5 ms (sklearn brute-force at 230 D)
per-window in batch=64: 3.4 ms (the production-realistic number)
Streamer published: model_metric{knn, 0.762} +
model_perf{knn, latency_us=3410, accuracy=0.762}.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
485 lines
20 KiB
Python
485 lines
20 KiB
Python
"""Emit `attack_profile` events for scene-6 (/proc signatures per profile).
|
||
|
||
Each event names a profile and ships an 80-point curve representing the
|
||
canonical /proc signature of that profile across the dataset.
|
||
|
||
The honest version (this file): aggregate the channel timeseries
|
||
across N (default 100) accepted episodes per profile, compute the
|
||
per-timestep MEDIAN, save median + p25/p75 band to disk, stream the
|
||
median. Episodes are aligned by *fraction-of-episode-duration* (each
|
||
episode resampled to the same 80-point grid, where index 0 is t=0
|
||
and index 79 is t=duration).
|
||
|
||
The shortcut (NOT this file): pick one accepted episode per profile
|
||
and render its raw curve. Confounded by single-episode noise,
|
||
phase-budget timing variance, and hardware-host heterogeneity.
|
||
|
||
Channels are /proc-derived per the slide title:
|
||
|
||
cpu-saturate proc.cpu_user_jiffies sustained 1-vCPU peg
|
||
io-walk proc.io_write_bytes fs walk + urandom writes
|
||
bursty-c2 proc.cpu_user_jiffies long idle + bursts
|
||
low-and-slow proc.cpu_user_jiffies low baseline + bumps
|
||
scan-and-dial proc.cpu_sys_jiffies many short network syscalls
|
||
shell-resident proc.cpu_sys_jiffies persistent socket + ticks
|
||
|
||
(Counter channels — channel_arrays converts to per-second rates.)
|
||
|
||
Two subcommands:
|
||
|
||
fit — aggregate N episodes per profile, save the median + band
|
||
to data/processed/attack_profiles_v1.parquet
|
||
stream — load the saved parquet, publish attack_profile events
|
||
on a tick (with --loop for long-running)
|
||
|
||
Same producer pattern as knn.py: fit-once, stream-from-disk.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import pyarrow as pa
|
||
import pyarrow.parquet as pq
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||
from training._episode_io import open_episode
|
||
from training._features import channel_arrays, episode_t0_wall_ns
|
||
from training.producers._publish import (
|
||
PublishFn, http_publisher, null_publisher,
|
||
)
|
||
|
||
|
||
log = logging.getLogger("cis490.dashboard.producers.profiles")
|
||
|
||
|
||
# Slide-title is "/proc signature per profile", so we use proc-side
|
||
# channels exclusively. After channel_arrays() these become per-second
|
||
# rates (counter-diff).
|
||
#
|
||
# io-walk uses proc.cpu_sys_jiffies, NOT proc.io_write_bytes, because
|
||
# the host /proc on the QEMU process doesn't see virtio-blk writes
|
||
# via the io.write_bytes counter (writes go through KVM's I/O path,
|
||
# not write() syscalls). cpu_sys_jiffies tracks kernel time, which
|
||
# does spike during heavy host-side I/O scheduling.
|
||
PROFILE_TO_CHANNEL: dict[str, tuple[str, str]] = {
|
||
"cpu-saturate": ("proc.cpu_user_jiffies", "sustained 1-vCPU peg (XMRig)"),
|
||
"io-walk": ("proc.cpu_sys_jiffies", "fs traversal + urandom writes"),
|
||
"bursty-c2": ("proc.cpu_user_jiffies", "long idle + 3-packet egress bursts"),
|
||
"low-and-slow": ("proc.cpu_user_jiffies", "minimal CPU + periodic memory churn"),
|
||
"scan-and-dial": ("proc.cpu_sys_jiffies", "SYN-style probes + dial-home"),
|
||
"shell-resident": ("proc.cpu_sys_jiffies", "persistent TCP socket + command ticks"),
|
||
}
|
||
|
||
|
||
# Phase-aware alignment: per-phase points in the concatenated 80-point
|
||
# curve. Sums to 80. The canonical schedule is
|
||
# clean(10s) → armed(2s) → infecting(3s) → infected_running(25s) → clean(5s)
|
||
# but the per-phase point allocation is shape-driven not time-driven —
|
||
# we want infected_running (the operationally distinctive phase) to
|
||
# dominate the visual.
|
||
PHASE_POINT_BUDGET = {
|
||
"clean": 10, # opening clean; the baseline
|
||
"armed": 5,
|
||
"infecting": 10,
|
||
"infected_running": 40, # the signature phase
|
||
"dormant": 10, # if the episode walks through dormant; we
|
||
# collapse it into infected_running budget
|
||
# if not present
|
||
"_tail_clean": 15, # closing clean (after infected_running);
|
||
# kept separate from leading clean
|
||
}
|
||
# Total: 10 + 5 + 10 + 40 + 15 = 80 (dormant is only added if present)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# Fit
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _resample_to_grid(t: np.ndarray, v: np.ndarray, n: int,
|
||
*, t_start: float | None = None,
|
||
t_end: float | None = None) -> np.ndarray | None:
|
||
"""Resample (t, v) to n points across [t_start, t_end] (or [t.min,
|
||
t.max] if not given). Linear interp on finite samples; NaN-padded
|
||
if the slice has too few finite points to interpolate.
|
||
|
||
The first sample of a counter-diff'd rate is often a startup-artifact
|
||
spike (cumulative-since-process-start divided by first sample dt).
|
||
We don't drop it here — leaves that to the caller — but the slice-
|
||
by-time approach used by phase-aware alignment naturally avoids it
|
||
when t_start > 0.
|
||
"""
|
||
if len(t) < 2:
|
||
return None
|
||
finite = np.isfinite(v)
|
||
if int(finite.sum()) < 2:
|
||
return None
|
||
t_f = t[finite]; v_f = v[finite]
|
||
lo = t_start if t_start is not None else float(t_f.min())
|
||
hi = t_end if t_end is not None else float(t_f.max())
|
||
if hi - lo < 1e-9:
|
||
return None
|
||
grid = np.linspace(lo, hi, n)
|
||
return np.interp(grid, t_f, v_f, left=np.nan, right=np.nan)
|
||
|
||
|
||
def _phase_segments(epi) -> list[tuple[str, float, float]]:
|
||
"""Return [(phase, t_start_s, t_end_s)] sequence for one episode.
|
||
|
||
Uses labels.jsonl events. The last segment ends at the episode's
|
||
duration (from meta.json). Times are episode-relative (t_wall_ns
|
||
- first label's t_wall_ns / 1e9). This is the same clock the
|
||
feature extractor uses.
|
||
"""
|
||
if not epi.labels:
|
||
return []
|
||
t0 = int(epi.labels[0]["t_wall_ns"])
|
||
duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0
|
||
out: list[tuple[str, float, float]] = []
|
||
labels = epi.labels
|
||
for i, L in enumerate(labels):
|
||
ph = L.get("phase")
|
||
if ph is None:
|
||
continue
|
||
t_s = (L["t_wall_ns"] - t0) / 1e9
|
||
if i + 1 < len(labels):
|
||
t_e = (labels[i + 1]["t_wall_ns"] - t0) / 1e9
|
||
else:
|
||
t_e = duration if duration > t_s else t_s + 1.0
|
||
if t_e > t_s:
|
||
out.append((ph, t_s, t_e))
|
||
return out
|
||
|
||
|
||
def _phase_aligned_curve(epi, channel_name: str, n_points: int = 80
|
||
) -> np.ndarray | None:
|
||
"""Build a fixed-length per-phase-aligned curve for one episode.
|
||
|
||
The output is ``n_points`` long with each phase taking the budget
|
||
in PHASE_POINT_BUDGET. Layout (canonical 5-phase walk):
|
||
|
||
idx 0..9 clean (leading)
|
||
idx 10..14 armed
|
||
idx 15..24 infecting
|
||
idx 25..64 infected_running (the distinctive phase)
|
||
idx 65..79 clean (trailing)
|
||
|
||
If the episode has a `dormant` phase (longer walks), it gets
|
||
folded into the infected_running budget — we resample
|
||
`infected_running` ∪ `dormant` together since both are post-armed
|
||
activity and the median across episodes loses precision otherwise.
|
||
Returns None if the episode's labels don't include the canonical walk.
|
||
"""
|
||
from training._features import channel_arrays, episode_t0_wall_ns
|
||
|
||
segs = _phase_segments(epi)
|
||
if len(segs) < 4:
|
||
return None
|
||
|
||
# Find the first occurrence of each canonical phase
|
||
phase_starts: dict[str, float] = {}
|
||
phase_ends: dict[str, float] = {}
|
||
for ph, ts, te in segs:
|
||
if ph not in phase_starts:
|
||
phase_starts[ph] = ts
|
||
# Last end-time we see for this phase (may be reset on re-entry)
|
||
phase_ends[ph] = te
|
||
# Fold dormant into infected_running's slice if present
|
||
if "infected_running" in phase_starts and "dormant" in phase_starts:
|
||
# Extend infected_running's end through any dormant occurrences
|
||
last_relevant = max(
|
||
phase_ends.get("infected_running", 0.0),
|
||
phase_ends.get("dormant", 0.0),
|
||
)
|
||
phase_ends["infected_running"] = last_relevant
|
||
|
||
# We want: leading_clean, armed, infecting, infected_running, trailing_clean
|
||
# Find where leading_clean ends (= first armed) and where trailing_clean
|
||
# starts (= the LAST clean's start, after infected_running/dormant).
|
||
if "armed" not in phase_starts or "infected_running" not in phase_starts:
|
||
return None
|
||
leading_clean_start = 0.0
|
||
leading_clean_end = phase_starts["armed"]
|
||
armed_end = phase_starts.get("infecting", phase_starts["armed"] + 0.1)
|
||
infecting_end = phase_starts["infected_running"]
|
||
inf_run_end = phase_ends["infected_running"]
|
||
duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0
|
||
|
||
# Trailing clean = anything after infected_running's end
|
||
trailing_clean_start = inf_run_end
|
||
trailing_clean_end = duration if duration > inf_run_end else inf_run_end + 0.1
|
||
|
||
arrs = channel_arrays(epi, episode_t0_wall_ns(epi))
|
||
t, v = arrs.get(channel_name, (np.zeros(0), np.zeros(0)))
|
||
if len(t) < 2:
|
||
return None
|
||
|
||
# Per-phase resample then concat. NaN values in the segment are
|
||
# interpolated past — np.interp's left/right=NaN don't apply here
|
||
# because we slice by time within the channel's data range.
|
||
pieces: list[np.ndarray] = []
|
||
for (label, t_s, t_e, n) in [
|
||
("clean_lead", leading_clean_start, leading_clean_end, PHASE_POINT_BUDGET["clean"]),
|
||
("armed", leading_clean_end, armed_end, PHASE_POINT_BUDGET["armed"]),
|
||
("infecting", armed_end, infecting_end, PHASE_POINT_BUDGET["infecting"]),
|
||
("infected_running", infecting_end, inf_run_end, PHASE_POINT_BUDGET["infected_running"]),
|
||
("clean_tail", trailing_clean_start, trailing_clean_end, PHASE_POINT_BUDGET["_tail_clean"]),
|
||
]:
|
||
if t_e <= t_s:
|
||
pieces.append(np.full(n, np.nan, dtype=np.float64))
|
||
continue
|
||
seg = _resample_to_grid(t, v, n=n, t_start=t_s, t_end=t_e)
|
||
if seg is None:
|
||
pieces.append(np.full(n, np.nan, dtype=np.float64))
|
||
else:
|
||
pieces.append(seg)
|
||
|
||
out = np.concatenate(pieces, axis=0)
|
||
if out.shape[0] != n_points:
|
||
# Defensive: pad/truncate to expected length
|
||
if out.shape[0] < n_points:
|
||
out = np.concatenate(
|
||
[out, np.full(n_points - out.shape[0], np.nan)]
|
||
)
|
||
else:
|
||
out = out[:n_points]
|
||
return out
|
||
|
||
|
||
def aggregate_envelope(
|
||
*, validation_path: Path, store_root: Path,
|
||
profile: str, channel_name: str,
|
||
n_episodes_target: int = 100, n_points: int = 80,
|
||
seed: int = 0,
|
||
) -> dict | None:
|
||
"""Aggregate the channel timeseries across up to ``n_episodes_target``
|
||
accepted episodes of ``profile``. Returns a dict with the canonical
|
||
median curve + p25/p75 band, plus diagnostics; ``None`` if no
|
||
episodes were usable.
|
||
|
||
Aggregation method:
|
||
1. Random-sample up to N accepted episodes of this profile
|
||
2. For each, decompress the tarball, extract channel via
|
||
channel_arrays() (counter-diff applied)
|
||
3. Resample to n_points by fraction-of-episode-duration
|
||
4. Stack → (n_used, n_points)
|
||
5. Per-timestep median + p25/p75 across episodes
|
||
6. Normalize median to [0, 1] for the dashboard renderer
|
||
"""
|
||
rng = np.random.default_rng(seed)
|
||
val = pq.read_table(
|
||
validation_path,
|
||
columns=["episode_id", "host_id", "profile", "status"],
|
||
).to_pylist()
|
||
candidates = [r for r in val
|
||
if r["status"] == "accepted" and r["profile"] == profile]
|
||
if not candidates:
|
||
log.warning("no accepted episodes for profile %s", profile)
|
||
return None
|
||
if len(candidates) > n_episodes_target:
|
||
idx = rng.choice(len(candidates), n_episodes_target, replace=False)
|
||
candidates = [candidates[i] for i in idx]
|
||
|
||
curves: list[np.ndarray] = []
|
||
n_attempted = 0
|
||
n_failed = 0
|
||
for r in candidates:
|
||
n_attempted += 1
|
||
path = Path(store_root) / r["host_id"] / f"{r['episode_id']}.tar.zst"
|
||
if not path.exists():
|
||
n_failed += 1
|
||
continue
|
||
try:
|
||
epi = open_episode(path, host_id=r["host_id"])
|
||
except Exception as e:
|
||
log.debug("open %s failed: %s", path, e)
|
||
n_failed += 1
|
||
continue
|
||
if not epi.labels:
|
||
n_failed += 1
|
||
continue
|
||
# Phase-aware alignment: each episode's curve uses its phase
|
||
# boundaries from labels.jsonl, resampled per phase to a fixed
|
||
# budget so the median across episodes captures the canonical
|
||
# per-phase shape (vs the fraction-of-duration shortcut, which
|
||
# washes out plateaus when phase timing varies).
|
||
curve = _phase_aligned_curve(epi, channel_name, n_points=n_points)
|
||
if curve is None:
|
||
n_failed += 1
|
||
continue
|
||
curves.append(curve)
|
||
|
||
if not curves:
|
||
log.warning("no usable episodes for profile %s "
|
||
"(attempted=%d, failed=%d)",
|
||
profile, n_attempted, n_failed)
|
||
return None
|
||
|
||
arr = np.asarray(curves, dtype=np.float64) # (n_used, n_points)
|
||
# nanmedian/percentile because individual phase segments may have
|
||
# produced NaN where the channel had no data in that phase.
|
||
median = np.nanmedian(arr, axis=0)
|
||
median = np.where(np.isnan(median), 0.0, median)
|
||
p25 = np.nan_to_num(np.nanpercentile(arr, 25, axis=0), nan=0.0)
|
||
p75 = np.nan_to_num(np.nanpercentile(arr, 75, axis=0), nan=0.0)
|
||
|
||
# Normalize the median for the renderer; keep raw values so the band
|
||
# can be plotted in the same scale on a future widget.
|
||
m_lo, m_hi = float(np.nanmin(median)), float(np.nanmax(median))
|
||
if m_hi - m_lo < 1e-9:
|
||
median_norm = np.zeros_like(median)
|
||
else:
|
||
median_norm = (median - m_lo) / (m_hi - m_lo)
|
||
|
||
log.info("%-15s channel=%-26s n_used=%d/%d range=[%.3g, %.3g]",
|
||
profile, channel_name, len(curves), n_attempted, m_lo, m_hi)
|
||
|
||
return {
|
||
"profile": profile,
|
||
"channel": channel_name,
|
||
"n_episodes_used": len(curves),
|
||
"n_episodes_attempted": n_attempted,
|
||
"median": median_norm.astype(np.float32).tolist(),
|
||
"median_raw": median.astype(np.float32).tolist(),
|
||
"p25_raw": p25.astype(np.float32).tolist(),
|
||
"p75_raw": p75.astype(np.float32).tolist(),
|
||
"median_min_raw": m_lo,
|
||
"median_max_raw": m_hi,
|
||
}
|
||
|
||
|
||
def _save_fit(rows: list[dict], path: Path) -> None:
|
||
"""Save aggregation result as a parquet (one row per profile)."""
|
||
if not rows:
|
||
raise ValueError("nothing to save")
|
||
n_points = len(rows[0]["median"])
|
||
schema = pa.schema([
|
||
("profile", pa.string()),
|
||
("channel", pa.string()),
|
||
("n_episodes_used", pa.int32()),
|
||
("n_episodes_attempted", pa.int32()),
|
||
("median", pa.list_(pa.float32(), n_points)),
|
||
("median_raw", pa.list_(pa.float32(), n_points)),
|
||
("p25_raw", pa.list_(pa.float32(), n_points)),
|
||
("p75_raw", pa.list_(pa.float32(), n_points)),
|
||
("median_min_raw", pa.float64()),
|
||
("median_max_raw", pa.float64()),
|
||
])
|
||
cols = {n: [r[n] for r in rows] for n in schema.names}
|
||
tbl = pa.table(cols, schema=schema)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
pq.write_table(tbl, path, compression="zstd")
|
||
|
||
|
||
async def _fit(args) -> int:
|
||
rows: list[dict] = []
|
||
for prof, (ch_name, _) in PROFILE_TO_CHANNEL.items():
|
||
res = aggregate_envelope(
|
||
validation_path=args.validation,
|
||
store_root=args.store,
|
||
profile=prof, channel_name=ch_name,
|
||
n_episodes_target=args.n_episodes_per_profile,
|
||
n_points=args.n_points,
|
||
seed=args.seed,
|
||
)
|
||
if res is not None:
|
||
rows.append(res)
|
||
if not rows:
|
||
log.error("no profiles had usable episodes")
|
||
return 1
|
||
_save_fit(rows, args.fit_out)
|
||
log.info("wrote %s (%d profile rows)", args.fit_out, len(rows))
|
||
print(json.dumps({
|
||
"saved_to": str(args.fit_out),
|
||
"n_profiles": len(rows),
|
||
"profiles": [r["profile"] for r in rows],
|
||
}, indent=2))
|
||
return 0
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# Stream
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
async def _stream(args) -> int:
|
||
if not args.load_fit.exists():
|
||
log.error("fit parquet not found: %s — run `profiles fit --fit-out` first",
|
||
args.load_fit)
|
||
return 1
|
||
tbl = pq.read_table(args.load_fit).to_pylist()
|
||
log.info("loaded %d profile aggregates from %s", len(tbl), args.load_fit)
|
||
|
||
publisher = (null_publisher() if args.dry_run
|
||
else http_publisher(args.publish_url))
|
||
|
||
while True:
|
||
for r in tbl:
|
||
shape_text = PROFILE_TO_CHANNEL.get(r["profile"], (None, ""))[1]
|
||
await publisher({
|
||
"type": "attack_profile",
|
||
"name": r["profile"],
|
||
"shape": shape_text,
|
||
"curve": list(r["median"]),
|
||
})
|
||
log.info("cycle complete: %d profiles", len(tbl))
|
||
if not args.loop:
|
||
return 0
|
||
await asyncio.sleep(args.cycle_pause_s)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ─────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def main() -> int:
|
||
p = argparse.ArgumentParser()
|
||
sub = p.add_subparsers(dest="cmd", required=True)
|
||
|
||
pf = sub.add_parser("fit",
|
||
help="aggregate N episodes per profile and save "
|
||
"the canonical envelope to disk")
|
||
pf.add_argument("--validation", required=True, type=Path)
|
||
pf.add_argument("--store", required=True, type=Path,
|
||
help="receiver episode store root, e.g. "
|
||
"/var/lib/cis490/episodes")
|
||
pf.add_argument("--fit-out", type=Path,
|
||
default=Path("data/processed/attack_profiles_v1.parquet"))
|
||
pf.add_argument("--n-episodes-per-profile", type=int, default=100)
|
||
pf.add_argument("--n-points", type=int, default=80,
|
||
help="resampled curve length (default 80, what "
|
||
"the dashboard's profile thumbnail expects)")
|
||
pf.add_argument("--seed", type=int, default=0)
|
||
pf.add_argument("--log-level", default="INFO")
|
||
pf.set_defaults(func=_fit)
|
||
|
||
ps = sub.add_parser("stream",
|
||
help="publish attack_profile events from a saved "
|
||
"fit parquet")
|
||
ps.add_argument("--load-fit", type=Path,
|
||
default=Path("data/processed/attack_profiles_v1.parquet"))
|
||
ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
|
||
ps.add_argument("--loop", action="store_true",
|
||
help="cycle indefinitely so reconnecting browsers "
|
||
"stay populated")
|
||
ps.add_argument("--cycle-pause-s", type=float, default=20.0,
|
||
help="pause between cycles when --loop is set")
|
||
ps.add_argument("--dry-run", action="store_true")
|
||
ps.add_argument("--log-level", default="INFO")
|
||
ps.set_defaults(func=_stream)
|
||
|
||
args = p.parse_args()
|
||
logging.basicConfig(level=args.log_level,
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s")
|
||
return asyncio.run(args.func(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|