The model layer of the project, built honestly:
- tools/dataset_validate.py — full-sweep validator over the receiver
store (sha256, schema, monotonic labels, telemetry-row gate). On the
current corpus: 64,798 accepted + 8,154 degraded + 3,701 rejected +
7 errored across 76,660 shipped episodes. data/processed/validation_v1.parquet
is committed as the per-episode acceptance index.
- training/_features.py — channel registry (46 channels across
proc/guest/qmp/netflow), summary-stat windowing AND channel×time
tensor extraction at 10s/5s windowing. Time alignment uses t_wall_ns
(Unix ns) — tested fix for a real netflow-vs-host clock-base
inconsistency that was silently dropping every netflow channel.
- training/_split.py — three held-out recipes (host / sample / time)
with profile-stratification assertions. held_out_host carries
untested_profiles for cases like scan-and-dial absent from the test
host (5 of 6 profiles tested cross-device, never silently averaged).
- training/models/ — 6 architectures behind a common BaseModel
interface: gbt (XGBoost), mlp, cnn, gru, lstm, transformer. Each
trained twice (realistic / oracle) per the deployment threat model.
Schema-hashed checkpoints refuse to load if _features.py changed
since training (silent-input-drift protection, tested).
- training/trainer/ — unified training loop: class-weighted CE, LR
warmup + cosine, gradient clipping, mixed precision when CUDA,
early stopping on val macro F1, best-on-val checkpoint. Same loop
runs MLP/CNN/GRU/LSTM/Transformer; GBT uses XGBoost
early_stopping_rounds on val mlogloss.
- training/eval_/ — bootstrap 95% CIs on macro F1, per-class F1,
per-profile and per-host breakdown, paired-bootstrap significance
for model-vs-model gap. Confusion matrix uses union of seen labels.
- training/dashboard/producers/ — replay/metrics/perf/profiles
emitting the six event types the dashboard's awaiting scenes
consume; on-demand tensor extraction so the Pi can run live
inference without 65 GB of shards.
- 17 unit tests (split coverage, features round-trip, schema mismatch,
determinism, time-base alignment regression).
End-to-end smoke-trained all six on a 567-episode subset; held-out
test macro F1 reported with paired-bootstrap significance. The
methodology now reports honest cross-device generalization, not
in-distribution validation.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
121 lines
3.3 KiB
Python
121 lines
3.3 KiB
Python
"""Read an episode tarball (.tar.zst) into structured arrays.
|
|
|
|
Used by both the validator and the feature extractor so they share one
|
|
schema decoder. Episode layout per PIPELINE.md and the on-disk reality:
|
|
|
|
<episode_id>/
|
|
meta.json
|
|
labels.jsonl
|
|
events.jsonl
|
|
telemetry-proc.jsonl host /proc/<qemu_pid> @ ~10 Hz
|
|
telemetry-guest.jsonl in-guest agent @ ~10 Hz
|
|
telemetry-qmp.jsonl QEMU QMP @ ~1 Hz
|
|
netflow.jsonl bridge pcap aggregated @ ~10 Hz
|
|
network.pcap
|
|
done.marker
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import tarfile
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import zstandard as zstd
|
|
|
|
|
|
EXPECTED_FILES = {
|
|
"meta.json",
|
|
"labels.jsonl",
|
|
"events.jsonl",
|
|
"telemetry-proc.jsonl",
|
|
"telemetry-guest.jsonl",
|
|
"telemetry-qmp.jsonl",
|
|
"netflow.jsonl",
|
|
"done.marker",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Episode:
|
|
episode_id: str
|
|
host_id: str
|
|
meta: dict
|
|
labels: list[dict]
|
|
events: list[dict]
|
|
proc: list[dict]
|
|
guest: list[dict]
|
|
qmp: list[dict]
|
|
netflow: list[dict]
|
|
has_done_marker: bool = False
|
|
has_pcap: bool = False
|
|
raw_files: list[str] = field(default_factory=list)
|
|
|
|
|
|
def _read_jsonl(buf: bytes) -> list[dict]:
|
|
out: list[dict] = []
|
|
for line in buf.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
out.append(json.loads(line))
|
|
return out
|
|
|
|
|
|
def open_episode(tarball_path: Path, host_id: str) -> Episode:
|
|
dctx = zstd.ZstdDecompressor()
|
|
with tarball_path.open("rb") as f:
|
|
with dctx.stream_reader(f) as reader:
|
|
data = reader.read()
|
|
|
|
files: dict[str, bytes] = {}
|
|
raw_files: list[str] = []
|
|
has_pcap = False
|
|
with tarfile.open(fileobj=io.BytesIO(data), mode="r:") as tar:
|
|
for ti in tar:
|
|
if not ti.isfile():
|
|
continue
|
|
# Each tarball nests under <episode_id>/
|
|
base = Path(ti.name).name
|
|
raw_files.append(ti.name)
|
|
if base == "network.pcap":
|
|
has_pcap = True
|
|
continue
|
|
f = tar.extractfile(ti)
|
|
if f is None:
|
|
continue
|
|
files[base] = f.read()
|
|
|
|
if "meta.json" not in files:
|
|
raise ValueError(f"{tarball_path}: meta.json missing")
|
|
meta = json.loads(files["meta.json"])
|
|
episode_id = meta.get("episode_id", tarball_path.stem.split(".")[0])
|
|
|
|
return Episode(
|
|
episode_id=episode_id,
|
|
host_id=host_id,
|
|
meta=meta,
|
|
labels=_read_jsonl(files.get("labels.jsonl", b"")),
|
|
events=_read_jsonl(files.get("events.jsonl", b"")),
|
|
proc=_read_jsonl(files.get("telemetry-proc.jsonl", b"")),
|
|
guest=_read_jsonl(files.get("telemetry-guest.jsonl", b"")),
|
|
qmp=_read_jsonl(files.get("telemetry-qmp.jsonl", b"")),
|
|
netflow=_read_jsonl(files.get("netflow.jsonl", b"")),
|
|
has_done_marker="done.marker" in files,
|
|
has_pcap=has_pcap,
|
|
raw_files=raw_files,
|
|
)
|
|
|
|
|
|
def hash_only(tarball_path: Path) -> tuple[str, int]:
|
|
"""sha256 + size without decompressing."""
|
|
import hashlib
|
|
h = hashlib.sha256()
|
|
n = 0
|
|
with tarball_path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1 << 20), b""):
|
|
h.update(chunk)
|
|
n += len(chunk)
|
|
return h.hexdigest(), n
|