CIS490/training/producers/profiles.py
Max 697e36a315 training/producers: move out of dashboard/ per ownership boundary
Producers are event *sources* — the renderer is everything inside
training/dashboard/. Sibling layout makes the dependency direction
one-way (producers import from training.dashboard.events; dashboard
never reaches into producers).

  training/dashboard/producers/   →   training/producers/

Internal imports rewritten via sed; eval_/run.py and training/README.md
cross-references updated. CLI entry stays via `python -m training.producers.<sub>`
(replay / metrics / perf / profiles).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 12:06:56 -05:00

155 lines
5.6 KiB
Python

"""Emit `attack_profile` events — canonical envelope per profile.
For each known profile (cpu-saturate, scan-and-dial, …) pick a
representative episode from the validated set, extract one observable
channel that reflects the profile's shape, and publish a normalized
80-point curve as `attack_profile`.
Channel choice per profile is defensible:
cpu-saturate → guest.cpu_user (sustained 1-vCPU peg)
scan-and-dial → netflow.syn_count (SYN bursts)
io-walk → guest.eth0_tx_bytes? — actually use proc.io_write_bytes
since IO is the loud signal
bursty-c2 → netflow.bytes_out (idle + spikes)
low-and-slow → guest.mem_available (slow memory churn)
shell-resident → netflow.tcp_count (one persistent flow)
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from pathlib import Path
import numpy as np
import pyarrow.parquet as pq
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from training._episode_io import open_episode
from training._features import ALL_CHANNELS, channel_arrays
from training.producers._publish import (
PublishFn, http_publisher, null_publisher,
)
log = logging.getLogger("cis490.dashboard.producers.profiles")
PROFILE_TO_CHANNEL = {
"cpu-saturate": ("guest.cpu_user", "sustained 1-vCPU peg (XMRig)"),
"scan-and-dial": ("netflow.syn_count", "SYN-style probes + dial-home"),
"io-walk": ("proc.io_write_bytes", "fs traversal + 4 KiB urandom writes"),
"bursty-c2": ("netflow.bytes_out", "long idle + 3-packet egress bursts"),
"low-and-slow": ("guest.mem_available", "minimal CPU + periodic memory churn"),
"shell-resident": ("netflow.tcp_count", "one persistent TCP socket + ticks"),
}
def _resample(t: np.ndarray, v: np.ndarray, n: int = 80) -> list[float]:
"""Fixed-length curve via linear resample on uniform t-grid."""
if len(t) < 2:
return [0.0] * n
grid = np.linspace(t.min(), t.max(), n)
finite = np.isfinite(v)
if finite.sum() < 2:
return [0.0] * n
out = np.interp(grid, t[finite], v[finite])
# Normalize to [0, 1] for the dashboard's curve renderer
lo, hi = float(np.min(out)), float(np.max(out))
if hi - lo < 1e-9:
return [0.0] * n
return ((out - lo) / (hi - lo)).astype(float).tolist()
def _pick_episode_per_profile(validation_path: Path, store_root: Path
) -> dict[str, tuple[Path, str]]:
"""Return {profile: (tarball_path, host_id)} for the first accepted
episode we find for each profile."""
out: dict[str, tuple[Path, str]] = {}
val = pq.read_table(validation_path,
columns=["episode_id", "host_id", "profile", "status"]
).to_pylist()
for r in val:
if r["status"] != "accepted":
continue
prof = r["profile"]
if not prof or prof in out:
continue
path = store_root / r["host_id"] / f"{r['episode_id']}.tar.zst"
if path.exists():
out[prof] = (path, r["host_id"])
if len(out) == len(PROFILE_TO_CHANNEL):
break
return out
async def emit_profiles(*, publish: PublishFn, validation_path: Path,
store_root: Path) -> int:
picks = _pick_episode_per_profile(validation_path, store_root)
log.info("found example episodes for: %s", sorted(picks.keys()))
n = 0
for prof, (path, host_id) in picks.items():
cfg = PROFILE_TO_CHANNEL.get(prof)
if not cfg:
continue
ch_name, shape_text = cfg
try:
epi = open_episode(path, host_id=host_id)
except Exception as e:
log.warning("open %s failed: %s", path, e)
continue
if not epi.labels:
continue
t0 = int(epi.labels[0]["t_mono_ns"])
arrs = channel_arrays(epi, t0)
t, v = arrs.get(ch_name, (np.zeros(0), np.zeros(0)))
curve = _resample(t, v, n=80)
await publish({
"type": "attack_profile",
"name": prof, "shape": shape_text, "curve": curve,
})
n += 1
return n
async def _run(args: argparse.Namespace) -> int:
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
publisher = (null_publisher() if args.dry_run
else http_publisher(args.publish_url))
# Sample episodes once; their envelopes are static. Cache and
# re-publish on a tick for reconnects.
cached: list[dict] = []
async def cached_publish(msg: dict) -> None:
cached.append(msg)
await publisher(msg)
await emit_profiles(publish=cached_publish,
validation_path=args.validation,
store_root=args.store)
if args.interval <= 0 or not cached:
return 0
while True:
await asyncio.sleep(args.interval)
for msg in cached:
await publisher(msg)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--validation", required=True, type=Path)
ap.add_argument("--store", required=True, type=Path)
ap.add_argument("--publish-url", default="http://127.0.0.1:8447/publish")
ap.add_argument("--interval", type=float, default=30.0,
help="re-publish cached profile curves every N seconds; "
"0 = one-shot.")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
return asyncio.run(_run(args))
if __name__ == "__main__":
raise SystemExit(main())