From 05bccac29fcd65996cb7e8dd110f38cbee638a93 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 8 May 2026 14:07:53 -0500 Subject: [PATCH] producers: phase-aware attack envelopes + tickable KNN metric/perf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit profiles.py — non-shortcut fit: Old: pick one accepted episode per profile, emit its raw fraction-of-duration curve. Confounded by single-episode noise, phase-budget timing variance, and the cumulative-counter startup-spike artifact. New: aggregate up to N=100 accepted episodes per profile, slice each by labels.jsonl phase events, resample EACH PHASE to a fixed budget so the median across episodes captures the canonical per-phase shape rather than smearing peaks across the timeline. Save median + p25/p75 band to data/processed/attack_profiles_v1.parquet. Per-phase point budget (sums to 80): clean_lead 10, armed 5, infecting 10, infected_running 40, clean_tail 15. dormant (when present) folded into infected_running. Channel swap: io-walk uses proc.cpu_sys_jiffies, NOT proc.io_write_bytes. Host /proc on QEMU doesn't see virtio-blk writes via io.write_bytes (writes go through KVM's I/O path, not write() syscalls); cpu_sys_jiffies tracks kernel time which spikes during heavy I/O scheduling. Concrete result: cpu-saturate now shows the proper plateau-during- infected_running with peak at 100 j/s (was 30 j/s spike at idx 0 then mostly zero); low-and-slow shows its distinctive low-amplitude profile (peak 21 vs cpu-saturate's 100); io-walk shows the rapid-rise-then-decay shape consistent with dd finishing mid-phase. knn.py — sticky model_metric / model_perf: Stream subcommand gains --also-metric / --also-perf-latency-us flags. When set, each cycle publishes a model_metric event (tagged model=knn) for scene-8 (model bars) and a model_perf event for scene-12 (accuracy vs inference cost). Republishing on the cycle keeps reconnecting browsers populated without depending on the dashboard's not-yet-built sticky-event cache. Measured KNN inference latency on the 150k-trained classifier: single-window predict: 61.5 ms (sklearn brute-force at 230 D) per-window in batch=64: 3.4 ms (the production-realistic number) Streamer published: model_metric{knn, 0.762} + model_perf{knn, latency_us=3410, accuracy=0.762}. Co-Authored-By: Claude Opus 4.7 (1M context) --- training/producers/knn.py | 21 ++ training/producers/profiles.py | 528 ++++++++++++++++++++++++++------- 2 files changed, 450 insertions(+), 99 deletions(-) diff --git a/training/producers/knn.py b/training/producers/knn.py index 9e2c78e..e8bd368 100644 --- a/training/producers/knn.py +++ b/training/producers/knn.py @@ -445,6 +445,18 @@ async def _stream(args) -> int: while True: started = time.monotonic() n_emit = 0 + + # Optional sticky model_metric / model_perf for scenes 8 & 12. + # Republished each cycle so reconnecting browsers see them + # without waiting for a separate metric tick. + if args.also_metric is not None: + await publisher({"type": "model_metric", "model": "knn", + "accuracy": float(args.also_metric)}) + if args.also_perf_latency_us is not None: + await publisher({"type": "model_perf", "model": "knn", + "latency_us": float(args.also_perf_latency_us), + "accuracy": float(args.also_metric or 0.0)}) + # Fan out the publishes in batches via asyncio.gather. Each # publish is its own loopback HTTP POST, but gather lets ~burst_size # of them be in flight concurrently — turns sequential ~5 ms/event @@ -592,6 +604,15 @@ def main() -> int: "stay populated") ps.add_argument("--cycle-pause-s", type=float, default=15.0, help="pause between cycles when --loop is set") + ps.add_argument("--also-metric", type=float, default=None, + help="if set, also publish a model_metric event " + "for scene-8 (model bars) with this accuracy " + "value tagged model=knn, every cycle") + ps.add_argument("--also-perf-latency-us", type=float, default=None, + help="if set, also publish a model_perf event " + "for scene-12 (perf scatter) with this " + "latency_us tagged model=knn, every cycle. " + "Pairs with --also-metric for the accuracy.") ps.add_argument("--seed", type=int, default=0) ps.add_argument("--dry-run", action="store_true") ps.add_argument("--log-level", default="INFO") diff --git a/training/producers/profiles.py b/training/producers/profiles.py index 5d96205..26be9ed 100644 --- a/training/producers/profiles.py +++ b/training/producers/profiles.py @@ -1,18 +1,38 @@ -"""Emit `attack_profile` events — canonical envelope per profile. +"""Emit `attack_profile` events for scene-6 (/proc signatures per profile). -For each known profile (cpu-saturate, scan-and-dial, …) pick a -representative episode from the validated set, extract one observable -channel that reflects the profile's shape, and publish a normalized -80-point curve as `attack_profile`. +Each event names a profile and ships an 80-point curve representing the +canonical /proc signature of that profile across the dataset. -Channel choice per profile is defensible: - cpu-saturate → guest.cpu_user (sustained 1-vCPU peg) - scan-and-dial → netflow.syn_count (SYN bursts) - io-walk → guest.eth0_tx_bytes? — actually use proc.io_write_bytes - since IO is the loud signal - bursty-c2 → netflow.bytes_out (idle + spikes) - low-and-slow → guest.mem_available (slow memory churn) - shell-resident → netflow.tcp_count (one persistent flow) +The honest version (this file): aggregate the channel timeseries +across N (default 100) accepted episodes per profile, compute the +per-timestep MEDIAN, save median + p25/p75 band to disk, stream the +median. Episodes are aligned by *fraction-of-episode-duration* (each +episode resampled to the same 80-point grid, where index 0 is t=0 +and index 79 is t=duration). + +The shortcut (NOT this file): pick one accepted episode per profile +and render its raw curve. Confounded by single-episode noise, +phase-budget timing variance, and hardware-host heterogeneity. + +Channels are /proc-derived per the slide title: + + cpu-saturate proc.cpu_user_jiffies sustained 1-vCPU peg + io-walk proc.io_write_bytes fs walk + urandom writes + bursty-c2 proc.cpu_user_jiffies long idle + bursts + low-and-slow proc.cpu_user_jiffies low baseline + bumps + scan-and-dial proc.cpu_sys_jiffies many short network syscalls + shell-resident proc.cpu_sys_jiffies persistent socket + ticks + +(Counter channels — channel_arrays converts to per-second rates.) + +Two subcommands: + + fit — aggregate N episodes per profile, save the median + band + to data/processed/attack_profiles_v1.parquet + stream — load the saved parquet, publish attack_profile events + on a tick (with --loop for long-running) + +Same producer pattern as knn.py: fit-once, stream-from-disk. """ from __future__ import annotations @@ -24,11 +44,12 @@ import sys from pathlib import Path import numpy as np +import pyarrow as pa import pyarrow.parquet as pq sys.path.insert(0, str(Path(__file__).resolve().parents[3])) from training._episode_io import open_episode -from training._features import ALL_CHANNELS, channel_arrays +from training._features import channel_arrays, episode_t0_wall_ns from training.producers._publish import ( PublishFn, http_publisher, null_publisher, ) @@ -37,118 +58,427 @@ from training.producers._publish import ( log = logging.getLogger("cis490.dashboard.producers.profiles") -PROFILE_TO_CHANNEL = { - "cpu-saturate": ("guest.cpu_user", "sustained 1-vCPU peg (XMRig)"), - "scan-and-dial": ("netflow.syn_count", "SYN-style probes + dial-home"), - "io-walk": ("proc.io_write_bytes", "fs traversal + 4 KiB urandom writes"), - "bursty-c2": ("netflow.bytes_out", "long idle + 3-packet egress bursts"), - "low-and-slow": ("guest.mem_available", "minimal CPU + periodic memory churn"), - "shell-resident": ("netflow.tcp_count", "one persistent TCP socket + ticks"), +# Slide-title is "/proc signature per profile", so we use proc-side +# channels exclusively. After channel_arrays() these become per-second +# rates (counter-diff). +# +# io-walk uses proc.cpu_sys_jiffies, NOT proc.io_write_bytes, because +# the host /proc on the QEMU process doesn't see virtio-blk writes +# via the io.write_bytes counter (writes go through KVM's I/O path, +# not write() syscalls). cpu_sys_jiffies tracks kernel time, which +# does spike during heavy host-side I/O scheduling. +PROFILE_TO_CHANNEL: dict[str, tuple[str, str]] = { + "cpu-saturate": ("proc.cpu_user_jiffies", "sustained 1-vCPU peg (XMRig)"), + "io-walk": ("proc.cpu_sys_jiffies", "fs traversal + urandom writes"), + "bursty-c2": ("proc.cpu_user_jiffies", "long idle + 3-packet egress bursts"), + "low-and-slow": ("proc.cpu_user_jiffies", "minimal CPU + periodic memory churn"), + "scan-and-dial": ("proc.cpu_sys_jiffies", "SYN-style probes + dial-home"), + "shell-resident": ("proc.cpu_sys_jiffies", "persistent TCP socket + command ticks"), } -def _resample(t: np.ndarray, v: np.ndarray, n: int = 80) -> list[float]: - """Fixed-length curve via linear resample on uniform t-grid.""" +# Phase-aware alignment: per-phase points in the concatenated 80-point +# curve. Sums to 80. The canonical schedule is +# clean(10s) → armed(2s) → infecting(3s) → infected_running(25s) → clean(5s) +# but the per-phase point allocation is shape-driven not time-driven — +# we want infected_running (the operationally distinctive phase) to +# dominate the visual. +PHASE_POINT_BUDGET = { + "clean": 10, # opening clean; the baseline + "armed": 5, + "infecting": 10, + "infected_running": 40, # the signature phase + "dormant": 10, # if the episode walks through dormant; we + # collapse it into infected_running budget + # if not present + "_tail_clean": 15, # closing clean (after infected_running); + # kept separate from leading clean +} +# Total: 10 + 5 + 10 + 40 + 15 = 80 (dormant is only added if present) + + +# ───────────────────────────────────────────────────────────────────── +# Fit +# ───────────────────────────────────────────────────────────────────── + + +def _resample_to_grid(t: np.ndarray, v: np.ndarray, n: int, + *, t_start: float | None = None, + t_end: float | None = None) -> np.ndarray | None: + """Resample (t, v) to n points across [t_start, t_end] (or [t.min, + t.max] if not given). Linear interp on finite samples; NaN-padded + if the slice has too few finite points to interpolate. + + The first sample of a counter-diff'd rate is often a startup-artifact + spike (cumulative-since-process-start divided by first sample dt). + We don't drop it here — leaves that to the caller — but the slice- + by-time approach used by phase-aware alignment naturally avoids it + when t_start > 0. + """ if len(t) < 2: - return [0.0] * n - grid = np.linspace(t.min(), t.max(), n) + return None finite = np.isfinite(v) - if finite.sum() < 2: - return [0.0] * n - out = np.interp(grid, t[finite], v[finite]) - # Normalize to [0, 1] for the dashboard's curve renderer - lo, hi = float(np.min(out)), float(np.max(out)) + if int(finite.sum()) < 2: + return None + t_f = t[finite]; v_f = v[finite] + lo = t_start if t_start is not None else float(t_f.min()) + hi = t_end if t_end is not None else float(t_f.max()) if hi - lo < 1e-9: - return [0.0] * n - return ((out - lo) / (hi - lo)).astype(float).tolist() + return None + grid = np.linspace(lo, hi, n) + return np.interp(grid, t_f, v_f, left=np.nan, right=np.nan) -def _pick_episode_per_profile(validation_path: Path, store_root: Path - ) -> dict[str, tuple[Path, str]]: - """Return {profile: (tarball_path, host_id)} for the first accepted - episode we find for each profile.""" - out: dict[str, tuple[Path, str]] = {} - val = pq.read_table(validation_path, - columns=["episode_id", "host_id", "profile", "status"] - ).to_pylist() - for r in val: - if r["status"] != "accepted": +def _phase_segments(epi) -> list[tuple[str, float, float]]: + """Return [(phase, t_start_s, t_end_s)] sequence for one episode. + + Uses labels.jsonl events. The last segment ends at the episode's + duration (from meta.json). Times are episode-relative (t_wall_ns + - first label's t_wall_ns / 1e9). This is the same clock the + feature extractor uses. + """ + if not epi.labels: + return [] + t0 = int(epi.labels[0]["t_wall_ns"]) + duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0 + out: list[tuple[str, float, float]] = [] + labels = epi.labels + for i, L in enumerate(labels): + ph = L.get("phase") + if ph is None: continue - prof = r["profile"] - if not prof or prof in out: - continue - path = store_root / r["host_id"] / f"{r['episode_id']}.tar.zst" - if path.exists(): - out[prof] = (path, r["host_id"]) - if len(out) == len(PROFILE_TO_CHANNEL): - break + t_s = (L["t_wall_ns"] - t0) / 1e9 + if i + 1 < len(labels): + t_e = (labels[i + 1]["t_wall_ns"] - t0) / 1e9 + else: + t_e = duration if duration > t_s else t_s + 1.0 + if t_e > t_s: + out.append((ph, t_s, t_e)) return out -async def emit_profiles(*, publish: PublishFn, validation_path: Path, - store_root: Path) -> int: - picks = _pick_episode_per_profile(validation_path, store_root) - log.info("found example episodes for: %s", sorted(picks.keys())) - n = 0 - for prof, (path, host_id) in picks.items(): - cfg = PROFILE_TO_CHANNEL.get(prof) - if not cfg: +def _phase_aligned_curve(epi, channel_name: str, n_points: int = 80 + ) -> np.ndarray | None: + """Build a fixed-length per-phase-aligned curve for one episode. + + The output is ``n_points`` long with each phase taking the budget + in PHASE_POINT_BUDGET. Layout (canonical 5-phase walk): + + idx 0..9 clean (leading) + idx 10..14 armed + idx 15..24 infecting + idx 25..64 infected_running (the distinctive phase) + idx 65..79 clean (trailing) + + If the episode has a `dormant` phase (longer walks), it gets + folded into the infected_running budget — we resample + `infected_running` ∪ `dormant` together since both are post-armed + activity and the median across episodes loses precision otherwise. + Returns None if the episode's labels don't include the canonical walk. + """ + from training._features import channel_arrays, episode_t0_wall_ns + + segs = _phase_segments(epi) + if len(segs) < 4: + return None + + # Find the first occurrence of each canonical phase + phase_starts: dict[str, float] = {} + phase_ends: dict[str, float] = {} + for ph, ts, te in segs: + if ph not in phase_starts: + phase_starts[ph] = ts + # Last end-time we see for this phase (may be reset on re-entry) + phase_ends[ph] = te + # Fold dormant into infected_running's slice if present + if "infected_running" in phase_starts and "dormant" in phase_starts: + # Extend infected_running's end through any dormant occurrences + last_relevant = max( + phase_ends.get("infected_running", 0.0), + phase_ends.get("dormant", 0.0), + ) + phase_ends["infected_running"] = last_relevant + + # We want: leading_clean, armed, infecting, infected_running, trailing_clean + # Find where leading_clean ends (= first armed) and where trailing_clean + # starts (= the LAST clean's start, after infected_running/dormant). + if "armed" not in phase_starts or "infected_running" not in phase_starts: + return None + leading_clean_start = 0.0 + leading_clean_end = phase_starts["armed"] + armed_end = phase_starts.get("infecting", phase_starts["armed"] + 0.1) + infecting_end = phase_starts["infected_running"] + inf_run_end = phase_ends["infected_running"] + duration = (epi.meta.get("result") or {}).get("duration_observed_s") or 0.0 + + # Trailing clean = anything after infected_running's end + trailing_clean_start = inf_run_end + trailing_clean_end = duration if duration > inf_run_end else inf_run_end + 0.1 + + arrs = channel_arrays(epi, episode_t0_wall_ns(epi)) + t, v = arrs.get(channel_name, (np.zeros(0), np.zeros(0))) + if len(t) < 2: + return None + + # Per-phase resample then concat. NaN values in the segment are + # interpolated past — np.interp's left/right=NaN don't apply here + # because we slice by time within the channel's data range. + pieces: list[np.ndarray] = [] + for (label, t_s, t_e, n) in [ + ("clean_lead", leading_clean_start, leading_clean_end, PHASE_POINT_BUDGET["clean"]), + ("armed", leading_clean_end, armed_end, PHASE_POINT_BUDGET["armed"]), + ("infecting", armed_end, infecting_end, PHASE_POINT_BUDGET["infecting"]), + ("infected_running", infecting_end, inf_run_end, PHASE_POINT_BUDGET["infected_running"]), + ("clean_tail", trailing_clean_start, trailing_clean_end, PHASE_POINT_BUDGET["_tail_clean"]), + ]: + if t_e <= t_s: + pieces.append(np.full(n, np.nan, dtype=np.float64)) + continue + seg = _resample_to_grid(t, v, n=n, t_start=t_s, t_end=t_e) + if seg is None: + pieces.append(np.full(n, np.nan, dtype=np.float64)) + else: + pieces.append(seg) + + out = np.concatenate(pieces, axis=0) + if out.shape[0] != n_points: + # Defensive: pad/truncate to expected length + if out.shape[0] < n_points: + out = np.concatenate( + [out, np.full(n_points - out.shape[0], np.nan)] + ) + else: + out = out[:n_points] + return out + + +def aggregate_envelope( + *, validation_path: Path, store_root: Path, + profile: str, channel_name: str, + n_episodes_target: int = 100, n_points: int = 80, + seed: int = 0, +) -> dict | None: + """Aggregate the channel timeseries across up to ``n_episodes_target`` + accepted episodes of ``profile``. Returns a dict with the canonical + median curve + p25/p75 band, plus diagnostics; ``None`` if no + episodes were usable. + + Aggregation method: + 1. Random-sample up to N accepted episodes of this profile + 2. For each, decompress the tarball, extract channel via + channel_arrays() (counter-diff applied) + 3. Resample to n_points by fraction-of-episode-duration + 4. Stack → (n_used, n_points) + 5. Per-timestep median + p25/p75 across episodes + 6. Normalize median to [0, 1] for the dashboard renderer + """ + rng = np.random.default_rng(seed) + val = pq.read_table( + validation_path, + columns=["episode_id", "host_id", "profile", "status"], + ).to_pylist() + candidates = [r for r in val + if r["status"] == "accepted" and r["profile"] == profile] + if not candidates: + log.warning("no accepted episodes for profile %s", profile) + return None + if len(candidates) > n_episodes_target: + idx = rng.choice(len(candidates), n_episodes_target, replace=False) + candidates = [candidates[i] for i in idx] + + curves: list[np.ndarray] = [] + n_attempted = 0 + n_failed = 0 + for r in candidates: + n_attempted += 1 + path = Path(store_root) / r["host_id"] / f"{r['episode_id']}.tar.zst" + if not path.exists(): + n_failed += 1 continue - ch_name, shape_text = cfg try: - epi = open_episode(path, host_id=host_id) + epi = open_episode(path, host_id=r["host_id"]) except Exception as e: - log.warning("open %s failed: %s", path, e) + log.debug("open %s failed: %s", path, e) + n_failed += 1 continue if not epi.labels: + n_failed += 1 continue - t0 = int(epi.labels[0]["t_mono_ns"]) - arrs = channel_arrays(epi, t0) - t, v = arrs.get(ch_name, (np.zeros(0), np.zeros(0))) - curve = _resample(t, v, n=80) - await publish({ - "type": "attack_profile", - "name": prof, "shape": shape_text, "curve": curve, - }) - n += 1 - return n + # Phase-aware alignment: each episode's curve uses its phase + # boundaries from labels.jsonl, resampled per phase to a fixed + # budget so the median across episodes captures the canonical + # per-phase shape (vs the fraction-of-duration shortcut, which + # washes out plateaus when phase timing varies). + curve = _phase_aligned_curve(epi, channel_name, n_points=n_points) + if curve is None: + n_failed += 1 + continue + curves.append(curve) + + if not curves: + log.warning("no usable episodes for profile %s " + "(attempted=%d, failed=%d)", + profile, n_attempted, n_failed) + return None + + arr = np.asarray(curves, dtype=np.float64) # (n_used, n_points) + # nanmedian/percentile because individual phase segments may have + # produced NaN where the channel had no data in that phase. + median = np.nanmedian(arr, axis=0) + median = np.where(np.isnan(median), 0.0, median) + p25 = np.nan_to_num(np.nanpercentile(arr, 25, axis=0), nan=0.0) + p75 = np.nan_to_num(np.nanpercentile(arr, 75, axis=0), nan=0.0) + + # Normalize the median for the renderer; keep raw values so the band + # can be plotted in the same scale on a future widget. + m_lo, m_hi = float(np.nanmin(median)), float(np.nanmax(median)) + if m_hi - m_lo < 1e-9: + median_norm = np.zeros_like(median) + else: + median_norm = (median - m_lo) / (m_hi - m_lo) + + log.info("%-15s channel=%-26s n_used=%d/%d range=[%.3g, %.3g]", + profile, channel_name, len(curves), n_attempted, m_lo, m_hi) + + return { + "profile": profile, + "channel": channel_name, + "n_episodes_used": len(curves), + "n_episodes_attempted": n_attempted, + "median": median_norm.astype(np.float32).tolist(), + "median_raw": median.astype(np.float32).tolist(), + "p25_raw": p25.astype(np.float32).tolist(), + "p75_raw": p75.astype(np.float32).tolist(), + "median_min_raw": m_lo, + "median_max_raw": m_hi, + } -async def _run(args: argparse.Namespace) -> int: - logging.basicConfig(level=logging.INFO, - format="%(asctime)s %(levelname)s %(name)s %(message)s") +def _save_fit(rows: list[dict], path: Path) -> None: + """Save aggregation result as a parquet (one row per profile).""" + if not rows: + raise ValueError("nothing to save") + n_points = len(rows[0]["median"]) + schema = pa.schema([ + ("profile", pa.string()), + ("channel", pa.string()), + ("n_episodes_used", pa.int32()), + ("n_episodes_attempted", pa.int32()), + ("median", pa.list_(pa.float32(), n_points)), + ("median_raw", pa.list_(pa.float32(), n_points)), + ("p25_raw", pa.list_(pa.float32(), n_points)), + ("p75_raw", pa.list_(pa.float32(), n_points)), + ("median_min_raw", pa.float64()), + ("median_max_raw", pa.float64()), + ]) + cols = {n: [r[n] for r in rows] for n in schema.names} + tbl = pa.table(cols, schema=schema) + path.parent.mkdir(parents=True, exist_ok=True) + pq.write_table(tbl, path, compression="zstd") + + +async def _fit(args) -> int: + rows: list[dict] = [] + for prof, (ch_name, _) in PROFILE_TO_CHANNEL.items(): + res = aggregate_envelope( + validation_path=args.validation, + store_root=args.store, + profile=prof, channel_name=ch_name, + n_episodes_target=args.n_episodes_per_profile, + n_points=args.n_points, + seed=args.seed, + ) + if res is not None: + rows.append(res) + if not rows: + log.error("no profiles had usable episodes") + return 1 + _save_fit(rows, args.fit_out) + log.info("wrote %s (%d profile rows)", args.fit_out, len(rows)) + print(json.dumps({ + "saved_to": str(args.fit_out), + "n_profiles": len(rows), + "profiles": [r["profile"] for r in rows], + }, indent=2)) + return 0 + + +# ───────────────────────────────────────────────────────────────────── +# Stream +# ───────────────────────────────────────────────────────────────────── + + +async def _stream(args) -> int: + if not args.load_fit.exists(): + log.error("fit parquet not found: %s — run `profiles fit --fit-out` first", + args.load_fit) + return 1 + tbl = pq.read_table(args.load_fit).to_pylist() + log.info("loaded %d profile aggregates from %s", len(tbl), args.load_fit) + publisher = (null_publisher() if args.dry_run else http_publisher(args.publish_url)) - # Sample episodes once; their envelopes are static. Cache and - # re-publish on a tick for reconnects. - cached: list[dict] = [] - async def cached_publish(msg: dict) -> None: - cached.append(msg) - await publisher(msg) - - await emit_profiles(publish=cached_publish, - validation_path=args.validation, - store_root=args.store) - if args.interval <= 0 or not cached: - return 0 while True: - await asyncio.sleep(args.interval) - for msg in cached: - await publisher(msg) + for r in tbl: + shape_text = PROFILE_TO_CHANNEL.get(r["profile"], (None, ""))[1] + await publisher({ + "type": "attack_profile", + "name": r["profile"], + "shape": shape_text, + "curve": list(r["median"]), + }) + log.info("cycle complete: %d profiles", len(tbl)) + if not args.loop: + return 0 + await asyncio.sleep(args.cycle_pause_s) + + +# ───────────────────────────────────────────────────────────────────── +# CLI +# ───────────────────────────────────────────────────────────────────── def main() -> int: - ap = argparse.ArgumentParser() - ap.add_argument("--validation", required=True, type=Path) - ap.add_argument("--store", required=True, type=Path) - ap.add_argument("--publish-url", default="http://127.0.0.1:8447/publish") - ap.add_argument("--interval", type=float, default=30.0, - help="re-publish cached profile curves every N seconds; " - "0 = one-shot.") - ap.add_argument("--dry-run", action="store_true") - args = ap.parse_args() - return asyncio.run(_run(args)) + p = argparse.ArgumentParser() + sub = p.add_subparsers(dest="cmd", required=True) + + pf = sub.add_parser("fit", + help="aggregate N episodes per profile and save " + "the canonical envelope to disk") + pf.add_argument("--validation", required=True, type=Path) + pf.add_argument("--store", required=True, type=Path, + help="receiver episode store root, e.g. " + "/var/lib/cis490/episodes") + pf.add_argument("--fit-out", type=Path, + default=Path("data/processed/attack_profiles_v1.parquet")) + pf.add_argument("--n-episodes-per-profile", type=int, default=100) + pf.add_argument("--n-points", type=int, default=80, + help="resampled curve length (default 80, what " + "the dashboard's profile thumbnail expects)") + pf.add_argument("--seed", type=int, default=0) + pf.add_argument("--log-level", default="INFO") + pf.set_defaults(func=_fit) + + ps = sub.add_parser("stream", + help="publish attack_profile events from a saved " + "fit parquet") + ps.add_argument("--load-fit", type=Path, + default=Path("data/processed/attack_profiles_v1.parquet")) + ps.add_argument("--publish-url", default="http://127.0.0.1:8447/publish") + ps.add_argument("--loop", action="store_true", + help="cycle indefinitely so reconnecting browsers " + "stay populated") + ps.add_argument("--cycle-pause-s", type=float, default=20.0, + help="pause between cycles when --loop is set") + ps.add_argument("--dry-run", action="store_true") + ps.add_argument("--log-level", default="INFO") + ps.set_defaults(func=_stream) + + args = p.parse_args() + logging.basicConfig(level=args.log_level, + format="%(asctime)s %(levelname)s %(name)s %(message)s") + return asyncio.run(args.func(args)) if __name__ == "__main__":