CIS490/training/dashboard/feeder.py
Max Gorog 51f2437b71 baseline: phase mix from sampled dataset, not 5-min window
The widget was waiting on live `phase` events that don't flow when no
orchestrator is running, so it sat empty. Replace the rolling
5-minute window with a periodic feeder that samples 500 random
episode tarballs from /var/lib/cis490/episodes, extracts each
labels.jsonl, and aggregates phase durations using consecutive
t_mono_ns deltas. Result lands in broadcaster.state["phase_mix"]
(survives snapshot cycles via dict.update) and re-broadcasts every
~10 min.

Frontend reads phase_mix from snapshot on connect and from live
phase_mix events on refresh; the bar uses time-weighted proportions
when available (falls back to label counts), and only sums canonical
phases for the denominator so non-displayed `failed` records don't
shrink the visible bars. Eyebrow and sub-line update with live
sample/population/label counts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 13:04:36 -05:00

499 lines
19 KiB
Python

"""Real producers that wire the receiver's on-disk state to the
dashboard message bus.
Three feeders, all started by ``app.lifespan``:
- ``watch_index_jsonl`` — tails ``/var/lib/cis490/index.jsonl`` and
publishes one ``episode`` event per new line. Survives file
rotation by tracking inode.
- ``snapshot_loop`` — periodically derives ground-truth from disk
(per-host episode counts, total counts, alert tail) and updates
the broadcaster's persistent ``state`` so reconnecting clients
see warm numbers, not zero.
- ``watch_alerts_jsonl`` — same as the index tailer but for the
receiver's alerts log.
If a path doesn't exist (e.g. ``health/`` on a fresh deploy) the
feeder logs once and keeps polling — it'll start producing events the
moment the path appears.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import random
import subprocess
import tarfile
from pathlib import Path
from typing import Any, Awaitable, Callable
log = logging.getLogger("cis490.dashboard.feeder")
DEFAULT_DATA_ROOT = Path("/var/lib/cis490")
PublishFn = Callable[[dict[str, Any]], Awaitable[int]]
# ─────────────────────────────────────────────────────────────────────
# Tail helpers
# ─────────────────────────────────────────────────────────────────────
async def _tail_jsonl(
path: Path,
publish: PublishFn,
parse: Callable[[dict], dict | None],
*,
poll_interval: float = 1.0,
label: str = "tail",
) -> None:
"""Generic append-only JSONL tailer. ``parse`` shapes each record
into the dict we publish (return None to skip)."""
fd = None
inode: int | None = None
missing_logged = False
while True:
try:
if not path.exists():
if not missing_logged:
log.info("[%s] %s does not exist; will retry", label, path)
missing_logged = True
await asyncio.sleep(poll_interval * 5)
continue
missing_logged = False
st = path.stat()
if fd is None or inode != st.st_ino:
if fd is not None:
try: fd.close()
except Exception: pass
fd = path.open("r", encoding="utf-8", errors="replace")
inode = st.st_ino
fd.seek(0, os.SEEK_END)
log.info("[%s] watching %s (inode=%d, starting at %d bytes)",
label, path, inode, fd.tell())
chunk = await asyncio.to_thread(fd.read)
if chunk:
for line in chunk.splitlines():
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
log.warning("[%s] skipping malformed line: %r", label, line[:120])
continue
out = parse(rec)
if out is not None:
await publish(out)
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
raise
except Exception:
log.exception("[%s] error; reopening in 5s", label)
if fd is not None:
try: fd.close()
except Exception: pass
fd = None
inode = None
await asyncio.sleep(5)
# ─────────────────────────────────────────────────────────────────────
# Specific feeders
# ─────────────────────────────────────────────────────────────────────
async def watch_index_jsonl(broadcaster, path: Path) -> None:
"""Episode ingest log. One ``episode`` event per new line, plus
we maintain ``broadcaster.state["recent_episodes"]`` in lockstep
so reconnecting clients see warm history without re-reading
index.jsonl from disk."""
def parse(rec: dict) -> dict | None:
if "episode_id" not in rec or "host_id" not in rec:
return None
ep = {
"episode_id": rec.get("episode_id"),
"host_id": rec.get("host_id"),
"sha256": rec.get("sha256"),
"size_bytes": rec.get("size_bytes"),
"received_at": rec.get("received_at_wall"),
}
# Keep the live ring buffer + running totals in sync.
recent = broadcaster.state.setdefault("recent_episodes", [])
recent.insert(0, ep)
if len(recent) > RECENT_EPISODES_LIMIT:
del recent[RECENT_EPISODES_LIMIT:]
sb = ep["size_bytes"]
if isinstance(sb, (int, float)):
broadcaster.state["total_bytes"] = (
int(broadcaster.state.get("total_bytes", 0)) + int(sb)
)
broadcaster.state["total_episodes"] = (
int(broadcaster.state.get("total_episodes", 0)) + 1
)
return {"type": "episode", **ep}
await _tail_jsonl(path, broadcaster.publish, parse, label="index.jsonl")
async def watch_alerts_jsonl(publish: PublishFn, path: Path) -> None:
"""Operator-facing alerts (sick hosts, stuck shippers, etc.)."""
def parse(rec: dict) -> dict | None:
return {
"type": "alert",
"host_id": rec.get("host"),
"symptom": rec.get("symptom"),
"detail": rec.get("detail"),
"suggested_fix": rec.get("suggested_fix"),
"detected_at": rec.get("detected_at_wall"),
"dedup_key": rec.get("dedup_key"),
}
await _tail_jsonl(path, publish, parse, label="alerts.jsonl")
def _count_lines(p: Path) -> int:
"""Cheap line count — used for bootstrap totals. We accept 'one
extra line at the moment we read mid-write' as acceptable noise."""
try:
with p.open("rb") as f:
return sum(1 for _ in f)
except OSError:
return 0
RECENT_EPISODES_LIMIT = 200
def _snapshot_state(
data_root: Path,
index_path: Path,
alerts_path: Path,
*,
recent_limit: int = RECENT_EPISODES_LIMIT,
) -> dict:
"""Derive the canonical view from disk in one pass.
Reads index.jsonl front-to-back to collect ``total_episodes``,
``total_bytes`` and the trailing ``recent_episodes`` window for
the database-explorer widget. ~76k lines / 23 MiB takes ~1 s on
the Pi; this runs in ``to_thread`` so the event loop is unaffected.
Counts per host come from the filesystem listing — cheaper and
immune to JSON parse hiccups."""
host_counts: dict[str, int] = {}
episodes_root = data_root / "episodes"
if episodes_root.exists():
for host_dir in episodes_root.iterdir():
if not host_dir.is_dir():
continue
try:
host_counts[host_dir.name] = sum(
1 for entry in host_dir.iterdir() if entry.is_file()
)
except OSError:
continue
total_episodes = 0
total_bytes = 0
recent: list[dict] = []
if index_path.exists():
try:
with index_path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
total_episodes += 1
sb = rec.get("size_bytes")
if isinstance(sb, (int, float)):
total_bytes += int(sb)
recent.append({
"episode_id": rec.get("episode_id"),
"host_id": rec.get("host_id"),
"received_at": rec.get("received_at_wall"),
"size_bytes": sb,
"sha256": rec.get("sha256"),
})
# Periodic truncation keeps the buffer from growing
# unboundedly while we read all 76k lines.
if len(recent) > recent_limit * 4:
recent = recent[-recent_limit:]
except OSError:
pass
recent = recent[-recent_limit:]
recent.reverse() # newest-first for the UI
return {
"total_episodes": total_episodes,
"total_alerts": _count_lines(alerts_path),
"host_counts": host_counts,
"total_bytes": total_bytes,
"recent_episodes": recent,
}
async def snapshot_loop(
broadcaster,
*,
data_root: Path,
index_path: Path,
alerts_path: Path,
poll_interval: float = 30.0,
) -> None:
"""Refresh the broadcaster's persistent ``state`` periodically so
reconnecting clients see disk-truth, not just the in-session
delta. Also publishes a ``snapshot`` event so already-connected
widgets that want a hard reset can re-key on it."""
first = True
while True:
try:
snap = await asyncio.to_thread(
_snapshot_state, data_root, index_path, alerts_path
)
# `update` instead of `=` so out-of-band keys (phase_mix,
# anything future loops add) survive snapshot cycles.
broadcaster.state.update(snap)
if first:
log.info(
"snapshot: total_episodes=%d total_alerts=%d hosts=%d",
snap["total_episodes"], snap["total_alerts"], len(snap["host_counts"]),
)
first = False
await broadcaster.publish({"type": "snapshot", **snap})
except asyncio.CancelledError:
raise
except Exception:
log.exception("snapshot_loop error")
await asyncio.sleep(poll_interval)
# ─────────────────────────────────────────────────────────────────────
# Phase mix (dataset)
# ─────────────────────────────────────────────────────────────────────
#
# The baseline scene shows the proportion of time the workload spent
# in each labelled phase. Originally a rolling 5-min window of live
# `phase` events — but live events only flow when the orchestrator is
# running. To keep the slide reflecting *actual data* we sample N
# random episode tarballs on disk, extract the labels.jsonl from
# each (which is a list of phase-transition events stamped with
# t_mono_ns), and aggregate phase durations across the sample.
PHASE_MIX_SAMPLE = 500
PHASE_MIX_INTERVAL = 600.0 # seconds; ~10 min
def _read_episode_labels(path: Path, *, timeout: float = 10.0) -> list[dict] | None:
"""Stream-extract labels.jsonl from an episode tarball.
Uses ``zstd -dc | tarfile r|`` so we can break out of the stream as
soon as labels.jsonl appears (it's near the front of the tar) and
kill the zstd subprocess immediately after — avoiding a full
decompress when we only need ~10 lines.
"""
if not path.is_file():
return None
try:
proc = subprocess.Popen(
["zstd", "-dc", str(path)],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
)
except OSError:
return None
out: list[dict] | None = None
try:
with tarfile.open(fileobj=proc.stdout, mode="r|") as tar:
for member in tar:
if not member.isfile():
continue
name = member.name.rsplit("/", 1)[-1]
if name == "labels.jsonl":
f = tar.extractfile(member)
if f is None:
continue
data = f.read()
out = []
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except json.JSONDecodeError:
pass
break
except (tarfile.TarError, OSError):
out = None
finally:
if proc.stdout:
try: proc.stdout.close()
except Exception: pass
try: proc.kill() # short-circuit zstd if labels was the only file we needed
except Exception: pass
try: proc.wait(timeout=timeout)
except Exception: pass
return out
def _aggregate_episode_labels(
records: list[dict],
counts: dict[str, int],
weighted: dict[str, float],
) -> int:
"""Fold one episode's labels into running counters.
Each record is a phase-transition event with ``t_mono_ns``. The
duration of phase ``rec[i].phase`` is ``t_mono_ns[i+1] -
t_mono_ns[i]``; the trailing record gets a count bump but no
duration weight (we don't know its end without meta.json, and the
last phase is usually short ``infected_running`` cleanup so the
bias is acceptable for proportional display).
"""
if not records:
return 0
rec = sorted(records, key=lambda r: r.get("t_mono_ns") or 0)
n_labels = 0
if len(rec) == 1:
p = rec[0].get("phase")
if p:
counts[p] = counts.get(p, 0) + 1
n_labels = 1
return n_labels
for i in range(len(rec) - 1):
p = rec[i].get("phase")
if not p:
continue
t0 = rec[i].get("t_mono_ns")
t1 = rec[i + 1].get("t_mono_ns")
if not (isinstance(t0, (int, float)) and isinstance(t1, (int, float))):
continue
dur_s = max(0.0, (float(t1) - float(t0)) / 1e9)
weighted[p] = weighted.get(p, 0.0) + dur_s
counts[p] = counts.get(p, 0) + 1
n_labels += 1
tail = rec[-1].get("phase")
if tail:
counts[tail] = counts.get(tail, 0) + 1
n_labels += 1
return n_labels
def _compute_phase_mix(data_root: Path, sample: int = PHASE_MIX_SAMPLE) -> dict:
"""Sample N random episodes, aggregate their phase durations.
Returns ``{}`` if no episodes are on disk yet. Returns a dict with
``counts``, ``weighted_seconds``, ``sampled_episodes``,
``population_episodes``, and ``total_labels`` otherwise.
"""
episodes_root = data_root / "episodes"
if not episodes_root.is_dir():
return {}
files: list[Path] = []
try:
for host_dir in episodes_root.iterdir():
if not host_dir.is_dir():
continue
try:
for entry in host_dir.iterdir():
if entry.is_file() and entry.name.endswith(".tar.zst"):
files.append(entry)
except OSError:
continue
except OSError:
return {}
if not files:
return {}
population = len(files)
chosen = files if sample >= population else random.sample(files, sample)
counts: dict[str, int] = {}
weighted: dict[str, float] = {}
sampled_episodes = 0
total_labels = 0
for path in chosen:
labels = _read_episode_labels(path)
if not labels:
continue
added = _aggregate_episode_labels(labels, counts, weighted)
if added:
sampled_episodes += 1
total_labels += added
return {
"counts": counts,
"weighted_seconds": weighted,
"sampled_episodes": sampled_episodes,
"population_episodes": population,
"total_labels": total_labels,
}
async def phase_mix_loop(
broadcaster,
*,
data_root: Path,
poll_interval: float = PHASE_MIX_INTERVAL,
sample: int = PHASE_MIX_SAMPLE,
) -> None:
"""Recompute the dataset phase mix on a slow timer.
Lives off the main event loop via ``to_thread`` because the zstd
decompress + tar parse for a few hundred episodes takes long
enough to be noticeable on a Pi (still typically < 30 s).
"""
while True:
try:
mix = await asyncio.to_thread(_compute_phase_mix, data_root, sample)
if mix:
broadcaster.state["phase_mix"] = mix
await broadcaster.publish({"type": "phase_mix", **mix})
log.info(
"phase_mix: %d/%d episodes sampled, %d labels, "
"weighted=%s",
mix.get("sampled_episodes", 0),
mix.get("population_episodes", 0),
mix.get("total_labels", 0),
{k: round(v, 1) for k, v in mix.get("weighted_seconds", {}).items()},
)
else:
log.info("phase_mix: no episodes on disk yet, retrying")
except asyncio.CancelledError:
raise
except Exception:
log.exception("phase_mix_loop error")
await asyncio.sleep(poll_interval)
# ─────────────────────────────────────────────────────────────────────
# Lifecycle
# ─────────────────────────────────────────────────────────────────────
def start_feeders(broadcaster, *, data_root: Path = DEFAULT_DATA_ROOT) -> list[asyncio.Task]:
"""Kick off all feeder tasks. Caller is responsible for cancelling
them on shutdown (lifespan context handles that)."""
index_path = data_root / "index.jsonl"
alerts_path = data_root / "alerts.jsonl"
publish = broadcaster.publish
tasks = [
asyncio.create_task(
snapshot_loop(broadcaster, data_root=data_root,
index_path=index_path, alerts_path=alerts_path),
name="cis490.feeder.snapshot"),
asyncio.create_task(
watch_index_jsonl(broadcaster, index_path),
name="cis490.feeder.index"),
asyncio.create_task(
watch_alerts_jsonl(publish, alerts_path),
name="cis490.feeder.alerts"),
asyncio.create_task(
phase_mix_loop(broadcaster, data_root=data_root),
name="cis490.feeder.phase_mix"),
]
return tasks