"""Source 4 (feature, deployable): bridge-side pcap + bucketed netflow. Captures packets on the host-only ``br-malware`` bridge during an episode, writes the raw pcap, and produces a bucketed JSONL file the trainer can consume directly. The capture is **gateway-side** — the orchestrator sees the same packets a real upstream router/gateway would see in deployment, so features derived here transfer 1:1 to the deployment-time gateway observer. Implementation: - ``run_capture()`` spawns ``tcpdump -i -U -w `` as a subprocess for the episode duration. ``-U`` flushes per packet so the file is consumable mid-flight. - ``bucketize()`` reads a finished pcap and emits 100 ms-bucketed rows into ``netflow.jsonl``. Pure-Python pcap parser (no scapy / dpkt dependency); decodes Ethernet + IPv4 + TCP/UDP enough to fill the schema in docs/data-model.md. The pure-Python parser is intentionally minimal — it does NOT do fragment reassembly, IPv6, VLAN tags, or anything fancy. It handles the cases that occur on a host-only bridge for malware behaviour: plain Ethernet II, IPv4, TCP/UDP. Other frames are still counted at the byte/packet level but skipped for protocol-specific stats. """ from __future__ import annotations import json import logging import os import struct import subprocess import threading import time from collections import defaultdict from dataclasses import dataclass from pathlib import Path log = logging.getLogger("cis490.collectors.pcap") SOURCE = "bridge_pcap" AVAILABLE_IN_DEPLOYMENT = True # Pcap file-level header _PCAP_GLOBAL_HDR = " CaptureHandle: """Start a tcpdump capture on ``bridge``. Returns a handle the caller stops via ``stop_capture()``.""" pcap_path.parent.mkdir(parents=True, exist_ok=True) args = ["tcpdump", "-i", bridge, "-U", "-s", str(snaplen), "-w", str(pcap_path)] if bpf: args.append(bpf) log.info("starting pcap: %s", " ".join(args)) proc = subprocess.Popen( args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, # tcpdump may need root or CAP_NET_RAW. We don't elevate here. ) return CaptureHandle( proc=proc, pcap_path=pcap_path, bridge=bridge, started_mono_ns=time.monotonic_ns(), ) def stop_capture(handle: CaptureHandle, *, timeout_s: float = 5.0) -> int: """SIGINT tcpdump (the Right Signal — flushes buffers + exits 0). Returns the process exit code.""" proc = handle.proc if proc.poll() is None: proc.send_signal(2) # SIGINT try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=timeout_s) return proc.returncode # --------------------------------------------------------------------------- # Pure-Python pcap parser # --------------------------------------------------------------------------- def _iter_pcap(path: Path): """Yield ``(t_pkt_ns, frame_bytes)`` for every record in a pcap file. Tolerates either microsecond or nanosecond magics.""" with path.open("rb") as f: hdr = f.read(_PCAP_GLOBAL_HDR_SIZE) if len(hdr) < _PCAP_GLOBAL_HDR_SIZE: return magic = struct.unpack(" dict: """Decode an Ethernet/IPv4/{TCP,UDP} frame to a flat dict. Unknown protocols return only the ethertype + lengths.""" out: dict = {"size": len(frame)} if len(frame) < 14: return out ethertype = struct.unpack(">H", frame[12:14])[0] out["ethertype"] = ethertype if ethertype != 0x0800: # not IPv4 — count, don't decode further return out ip = frame[14:] if len(ip) < 20: return out ihl = (ip[0] & 0x0F) * 4 if ihl < 20 or len(ip) < ihl: return out proto = ip[9] src = ip[12:16] dst = ip[16:20] out["ip_proto"] = proto out["src_ip"] = ".".join(str(b) for b in src) out["dst_ip"] = ".".join(str(b) for b in dst) payload = ip[ihl:] if proto == 6 and len(payload) >= 20: # TCP sport, dport, _, _, off_flags = struct.unpack(">HHIIH", payload[:14]) flags = off_flags & 0x003F out["src_port"] = sport out["dst_port"] = dport out["tcp_flags"] = flags # FIN=1 SYN=2 RST=4 PSH=8 ACK=16 URG=32 elif proto == 17 and len(payload) >= 8: # UDP sport, dport, _, _ = struct.unpack(">HHHH", payload[:8]) out["src_port"] = sport out["dst_port"] = dport return out def bucketize( pcap_path: Path, netflow_path: Path, *, bucket_ms: int = 100, t_mono_origin_ns: int = 0, bridge_ip: str = "10.200.0.1", ) -> int: """Read a pcap and emit one row per ``bucket_ms`` window into ``netflow.jsonl``. The ``in/out`` direction is from the bridge perspective (host = ``bridge_ip``): out = packet whose src is the host-side address (host → guest) in = anything else seen on the bridge (guest → host or guest-to-guest) Returns the number of rows written.""" if not pcap_path.exists(): return 0 bucket_ns = bucket_ms * 1_000_000 netflow_path.parent.mkdir(parents=True, exist_ok=True) rows = 0 bucket_start: int | None = None agg: dict = _empty_bucket() with netflow_path.open("a", buffering=1) as out: for t_pkt_ns, frame in _iter_pcap(pcap_path): d = _decode(frame) # Establish first bucket origin on first packet. if bucket_start is None: bucket_start = t_pkt_ns - (t_pkt_ns % bucket_ns) while t_pkt_ns >= bucket_start + bucket_ns: _flush(out, agg, bucket_start, bucket_ns, t_mono_origin_ns) rows += 1 agg = _empty_bucket() bucket_start += bucket_ns _accumulate(agg, d, bridge_ip) if bucket_start is not None and any(v for v in agg.values() if v): _flush(out, agg, bucket_start, bucket_ns, t_mono_origin_ns) rows += 1 return rows def _empty_bucket() -> dict: return { "pkts_in": 0, "pkts_out": 0, "bytes_in": 0, "bytes_out": 0, "syn_count": 0, "fin_count": 0, "rst_count": 0, "udp_count": 0, "tcp_count": 0, "dns_query_count": 0, "dst_ips": set(), "dst_ports": set(), "tcp_new_flows": 0, } def _accumulate(agg: dict, d: dict, bridge_ip: str) -> None: sz = d.get("size", 0) is_out = d.get("src_ip") == bridge_ip if is_out: agg["pkts_out"] += 1 agg["bytes_out"] += sz else: agg["pkts_in"] += 1 agg["bytes_in"] += sz proto = d.get("ip_proto") if proto == 6: agg["tcp_count"] += 1 flags = d.get("tcp_flags", 0) if flags & 0x02: # SYN agg["syn_count"] += 1 if not (flags & 0x10): # SYN without ACK = new flow agg["tcp_new_flows"] += 1 if flags & 0x01: agg["fin_count"] += 1 if flags & 0x04: agg["rst_count"] += 1 elif proto == 17: agg["udp_count"] += 1 if d.get("dst_port") == 53: agg["dns_query_count"] += 1 dst = d.get("dst_ip") if dst: agg["dst_ips"].add(dst) dport = d.get("dst_port") if dport is not None: agg["dst_ports"].add(dport) def _flush(out, agg: dict, bucket_start_ns: int, bucket_ns: int, t_mono_origin_ns: int) -> None: row = { "t_mono_ns": bucket_start_ns - t_mono_origin_ns, "t_wall_ns": bucket_start_ns, "source": SOURCE, "available_in_deployment": AVAILABLE_IN_DEPLOYMENT, "bucket_ms": bucket_ns // 1_000_000, "pkts_in": agg["pkts_in"], "pkts_out": agg["pkts_out"], "bytes_in": agg["bytes_in"], "bytes_out": agg["bytes_out"], "syn_count": agg["syn_count"], "fin_count": agg["fin_count"], "rst_count": agg["rst_count"], "udp_count": agg["udp_count"], "tcp_count": agg["tcp_count"], "dns_query_count": agg["dns_query_count"], "unique_dst_ips": len(agg["dst_ips"]), "unique_dst_ports": len(agg["dst_ports"]), "tcp_new_flows": agg["tcp_new_flows"], } out.write(json.dumps(row) + "\n")