"""Read + filter the receiver's ``index.jsonl``. Usage: # All episodes from one host: cis490-index --host lab-host-1 # All episodes for a particular sample: cis490-index --sample xmrig-cryptominer # Today's episodes, sorted by size: cis490-index --since 2026-04-30 --sort size # Group/count by host: cis490-index --count-by host_id The index file is the closest thing to a database the receiver has until we move to Postgres/Timescale. This tool is the temporary CLI view over it; it's intentionally read-only and never opens episode tarballs (just the index rows). """ from __future__ import annotations import argparse import json import sys from collections import Counter from datetime import datetime, timezone from pathlib import Path DEFAULT_INDEX = "/var/lib/cis490/index.jsonl" def _parse_since(s: str) -> datetime: # Accept ISO-8601 with or without time. for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"): try: dt = datetime.strptime(s, fmt) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except ValueError: continue # Last resort: fromisoformat which handles a wider range in 3.11+. dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt def _row_time(row: dict) -> datetime | None: s = row.get("received_at_wall") if not s: return None try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except ValueError: return None def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="cis490-index") p.add_argument("--index", default=DEFAULT_INDEX, help=f"path to index.jsonl (default {DEFAULT_INDEX})") p.add_argument("--host", help="only rows from this host_id") p.add_argument("--sample", help="only rows whose meta.sample.name matches " "(requires meta.json from a recent commit)") p.add_argument("--since", help="ISO date or datetime; only rows received on/after") p.add_argument("--until", help="ISO date or datetime; only rows received before") p.add_argument("--sort", choices=("time", "size", "host"), default="time") p.add_argument("--count-by", choices=("host_id", "schema_version"), help="instead of printing rows, group + count by this field") p.add_argument("--limit", type=int, default=0, help="cap output rows (0 = all)") args = p.parse_args(argv) path = Path(args.index) if not path.exists(): print(f"no index at {path}", file=sys.stderr) return 2 since = _parse_since(args.since) if args.since else None until = _parse_since(args.until) if args.until else None rows: list[dict] = [] with path.open() as f: for line in f: line = line.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: continue if args.host and row.get("host_id") != args.host: continue if since or until: t = _row_time(row) if t is None: continue if since and t < since: continue if until and t >= until: continue rows.append(row) if args.count_by: counts = Counter(r.get(args.count_by, "") for r in rows) for k, n in counts.most_common(): print(f"{n:>6} {k}") return 0 sort_keys = { "time": lambda r: r.get("received_at_wall", ""), "size": lambda r: r.get("size_bytes", 0), "host": lambda r: r.get("host_id", ""), } rows.sort(key=sort_keys[args.sort]) if args.limit: rows = rows[-args.limit:] if args.sort != "size" else rows[:args.limit] # Print TSV-ish for quick eyeballing + downstream pipe-friendliness. print("received_at_wall\thost_id\tepisode_id\tsize_bytes\tschema_version\tsha256") for r in rows: print("\t".join(str(r.get(k, "")) for k in ("received_at_wall", "host_id", "episode_id", "size_bytes", "schema_version", "sha256"))) return 0 if __name__ == "__main__": sys.exit(main())