diff --git a/training/dashboard/feeder.py b/training/dashboard/feeder.py index fd26e0d..d97603d 100644 --- a/training/dashboard/feeder.py +++ b/training/dashboard/feeder.py @@ -23,6 +23,9 @@ import asyncio import json import logging import os +import random +import subprocess +import tarfile from pathlib import Path from typing import Any, Awaitable, Callable @@ -252,7 +255,9 @@ async def snapshot_loop( snap = await asyncio.to_thread( _snapshot_state, data_root, index_path, alerts_path ) - broadcaster.state = snap + # `update` instead of `=` so out-of-band keys (phase_mix, + # anything future loops add) survive snapshot cycles. + broadcaster.state.update(snap) if first: log.info( "snapshot: total_episodes=%d total_alerts=%d hosts=%d", @@ -267,6 +272,205 @@ async def snapshot_loop( await asyncio.sleep(poll_interval) +# ───────────────────────────────────────────────────────────────────── +# Phase mix (dataset) +# ───────────────────────────────────────────────────────────────────── +# +# The baseline scene shows the proportion of time the workload spent +# in each labelled phase. Originally a rolling 5-min window of live +# `phase` events — but live events only flow when the orchestrator is +# running. To keep the slide reflecting *actual data* we sample N +# random episode tarballs on disk, extract the labels.jsonl from +# each (which is a list of phase-transition events stamped with +# t_mono_ns), and aggregate phase durations across the sample. + +PHASE_MIX_SAMPLE = 500 +PHASE_MIX_INTERVAL = 600.0 # seconds; ~10 min + + +def _read_episode_labels(path: Path, *, timeout: float = 10.0) -> list[dict] | None: + """Stream-extract labels.jsonl from an episode tarball. + + Uses ``zstd -dc | tarfile r|`` so we can break out of the stream as + soon as labels.jsonl appears (it's near the front of the tar) and + kill the zstd subprocess immediately after — avoiding a full + decompress when we only need ~10 lines. + """ + if not path.is_file(): + return None + try: + proc = subprocess.Popen( + ["zstd", "-dc", str(path)], + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, + ) + except OSError: + return None + out: list[dict] | None = None + try: + with tarfile.open(fileobj=proc.stdout, mode="r|") as tar: + for member in tar: + if not member.isfile(): + continue + name = member.name.rsplit("/", 1)[-1] + if name == "labels.jsonl": + f = tar.extractfile(member) + if f is None: + continue + data = f.read() + out = [] + for line in data.splitlines(): + line = line.strip() + if not line: + continue + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + pass + break + except (tarfile.TarError, OSError): + out = None + finally: + if proc.stdout: + try: proc.stdout.close() + except Exception: pass + try: proc.kill() # short-circuit zstd if labels was the only file we needed + except Exception: pass + try: proc.wait(timeout=timeout) + except Exception: pass + return out + + +def _aggregate_episode_labels( + records: list[dict], + counts: dict[str, int], + weighted: dict[str, float], +) -> int: + """Fold one episode's labels into running counters. + + Each record is a phase-transition event with ``t_mono_ns``. The + duration of phase ``rec[i].phase`` is ``t_mono_ns[i+1] - + t_mono_ns[i]``; the trailing record gets a count bump but no + duration weight (we don't know its end without meta.json, and the + last phase is usually short ``infected_running`` cleanup so the + bias is acceptable for proportional display). + """ + if not records: + return 0 + rec = sorted(records, key=lambda r: r.get("t_mono_ns") or 0) + n_labels = 0 + if len(rec) == 1: + p = rec[0].get("phase") + if p: + counts[p] = counts.get(p, 0) + 1 + n_labels = 1 + return n_labels + for i in range(len(rec) - 1): + p = rec[i].get("phase") + if not p: + continue + t0 = rec[i].get("t_mono_ns") + t1 = rec[i + 1].get("t_mono_ns") + if not (isinstance(t0, (int, float)) and isinstance(t1, (int, float))): + continue + dur_s = max(0.0, (float(t1) - float(t0)) / 1e9) + weighted[p] = weighted.get(p, 0.0) + dur_s + counts[p] = counts.get(p, 0) + 1 + n_labels += 1 + tail = rec[-1].get("phase") + if tail: + counts[tail] = counts.get(tail, 0) + 1 + n_labels += 1 + return n_labels + + +def _compute_phase_mix(data_root: Path, sample: int = PHASE_MIX_SAMPLE) -> dict: + """Sample N random episodes, aggregate their phase durations. + + Returns ``{}`` if no episodes are on disk yet. Returns a dict with + ``counts``, ``weighted_seconds``, ``sampled_episodes``, + ``population_episodes``, and ``total_labels`` otherwise. + """ + episodes_root = data_root / "episodes" + if not episodes_root.is_dir(): + return {} + + files: list[Path] = [] + try: + for host_dir in episodes_root.iterdir(): + if not host_dir.is_dir(): + continue + try: + for entry in host_dir.iterdir(): + if entry.is_file() and entry.name.endswith(".tar.zst"): + files.append(entry) + except OSError: + continue + except OSError: + return {} + if not files: + return {} + population = len(files) + + chosen = files if sample >= population else random.sample(files, sample) + + counts: dict[str, int] = {} + weighted: dict[str, float] = {} + sampled_episodes = 0 + total_labels = 0 + for path in chosen: + labels = _read_episode_labels(path) + if not labels: + continue + added = _aggregate_episode_labels(labels, counts, weighted) + if added: + sampled_episodes += 1 + total_labels += added + + return { + "counts": counts, + "weighted_seconds": weighted, + "sampled_episodes": sampled_episodes, + "population_episodes": population, + "total_labels": total_labels, + } + + +async def phase_mix_loop( + broadcaster, + *, + data_root: Path, + poll_interval: float = PHASE_MIX_INTERVAL, + sample: int = PHASE_MIX_SAMPLE, +) -> None: + """Recompute the dataset phase mix on a slow timer. + + Lives off the main event loop via ``to_thread`` because the zstd + decompress + tar parse for a few hundred episodes takes long + enough to be noticeable on a Pi (still typically < 30 s). + """ + while True: + try: + mix = await asyncio.to_thread(_compute_phase_mix, data_root, sample) + if mix: + broadcaster.state["phase_mix"] = mix + await broadcaster.publish({"type": "phase_mix", **mix}) + log.info( + "phase_mix: %d/%d episodes sampled, %d labels, " + "weighted=%s", + mix.get("sampled_episodes", 0), + mix.get("population_episodes", 0), + mix.get("total_labels", 0), + {k: round(v, 1) for k, v in mix.get("weighted_seconds", {}).items()}, + ) + else: + log.info("phase_mix: no episodes on disk yet, retrying") + except asyncio.CancelledError: + raise + except Exception: + log.exception("phase_mix_loop error") + await asyncio.sleep(poll_interval) + + # ───────────────────────────────────────────────────────────────────── # Lifecycle # ───────────────────────────────────────────────────────────────────── @@ -288,5 +492,8 @@ def start_feeders(broadcaster, *, data_root: Path = DEFAULT_DATA_ROOT) -> list[a asyncio.create_task( watch_alerts_jsonl(publish, alerts_path), name="cis490.feeder.alerts"), + asyncio.create_task( + phase_mix_loop(broadcaster, data_root=data_root), + name="cis490.feeder.phase_mix"), ] return tasks diff --git a/training/dashboard/static/dashboard.js b/training/dashboard/static/dashboard.js index ed83270..9b0c7ef 100644 --- a/training/dashboard/static/dashboard.js +++ b/training/dashboard/static/dashboard.js @@ -1132,14 +1132,19 @@ for epoch in range(20): }); })(); - // ── Phase mix (rolling 5 min) ───────────────────────────────── - // Real-data widget. Will be empty until phase events flow. + // ── Phase mix (dataset-derived) ─────────────────────────────── + // Real-data widget. Driven by the dashboard's `phase_mix` feeder, + // which periodically samples random episode tarballs on disk and + // aggregates labels.jsonl phase durations across them. The feeder + // tucks the result into `broadcaster.state["phase_mix"]` so it + // arrives in the snapshot the WS sends on connect, and republishes + // a `phase_mix` event each time it recomputes. (function () { const stack = document.getElementById('phase-stack'); const legend = document.getElementById('phase-legend'); + const eyebrow = document.getElementById('phase-mix-eyebrow'); + const sub = document.getElementById('phase-mix-sub'); const PHASES = ['clean', 'armed', 'infecting', 'infected_running', 'dormant']; - const WINDOW_MS = 5 * 60 * 1000; - const samples = []; const segs = new Map(); PHASES.forEach(p => { @@ -1151,21 +1156,45 @@ for epoch in range(20): legend.appendChild(li); }); - function render() { - const now = Date.now(); - while (samples.length && now - samples[0].t > WINDOW_MS) samples.shift(); - const counts = Object.fromEntries(PHASES.map(p => [p, 0])); - samples.forEach(s => { if (counts[s.phase] !== undefined) counts[s.phase]++; }); - const total = Math.max(1, samples.length); - PHASES.forEach(p => { segs.get(p).style.flexGrow = (counts[p] / total).toFixed(4); }); + function fmtInt(n) { return (typeof n === 'number') ? n.toLocaleString() : '—'; } + + function applyMix(mix) { + if (!mix) return; + const w = mix.weighted_seconds || {}; + const c = mix.counts || {}; + // Prefer time-weighted proportions; fall back to label counts. + const useWeighted = Object.values(w).some(v => v > 0); + const src = useWeighted ? w : c; + // Sum only the canonical phases so non-displayed phases (e.g. + // `failed` from the orchestrator) don't shrink the visible bars. + const total = PHASES.reduce((a, p) => a + (src[p] || 0), 0) || 1; + PHASES.forEach(p => { + segs.get(p).style.flexGrow = ((src[p] || 0) / total).toFixed(4); + }); + if (eyebrow) { + const tag = useWeighted ? 'time-weighted' : 'label-count'; + eyebrow.textContent = + `phase mix · ${fmtInt(mix.sampled_episodes)} of ${fmtInt(mix.population_episodes)} episodes · ${tag}`; + } + if (sub) { + const hours = useWeighted + ? Math.round(PHASES.reduce((a, p) => a + (w[p] || 0), 0) / 3600) + : null; + sub.innerHTML = + `Aggregated across ${fmtInt(mix.sampled_episodes)} ` + + `randomly-sampled episodes ` + + `(${fmtInt(mix.total_labels)} phase records` + + (hours != null ? `, ~${fmtInt(hours)} hours` : '') + + `). Refreshes every ~10 min from disk.`; + } } - on('phase', m => { - if (!m.phase) return; - samples.push({ phase: m.phase, t: Date.now() }); render(); + on('snapshot', m => { if (m.phase_mix) applyMix(m.phase_mix); }); + on('phase_mix', applyMix); + on('demo_stop', () => { + // Demo toggle off doesn't wipe the dataset mix — the dataset is + // ground truth, the demo only fakes per-event widgets. }); - on('demo_stop', () => { samples.length = 0; render(); }); - setInterval(render, 1000); })(); // ── Database explorer ───────────────────────────────────────── diff --git a/training/dashboard/static/index.html b/training/dashboard/static/index.html index 3db3100..3ff0aba 100644 --- a/training/dashboard/static/index.html +++ b/training/dashboard/static/index.html @@ -243,13 +243,14 @@
-
phase mix · last 5 min
+
phase mix · sampling dataset…
-
awaiting phase events from - the orchestrator. A clean fleet sits mostly in - clean; skew toward infecting means - the workload is firing.
+
computing the phase + distribution across a random sample of episodes on disk. + A clean fleet sits mostly in clean; skew toward + infecting / infected_running + reflects time spent under attack workloads.
@@ -528,6 +529,6 @@ - +