"""Tests for the pcap collector's pure-Python parser + bucketizer. We synthesize a tiny pcap file in memory (Ethernet + IPv4 + TCP/UDP records with controlled timestamps), feed it to ``bucketize()``, and verify the produced netflow.jsonl rows are correct. """ from __future__ import annotations import json import struct from pathlib import Path import pytest from collectors import pcap # --------------------------------------------------------------------------- # pcap synthesis helpers # --------------------------------------------------------------------------- _PCAP_GLOBAL_HDR = struct.pack( " bytes: s = bytes(int(x) for x in src.split(".")) d = bytes(int(x) for x in dst.split(".")) total_len = 20 + len(payload) return struct.pack( ">BBHHHBBHII"[:0] + "BBHHHBBH", 0x45, # version=4, IHL=5 0, # tos total_len, 0, 0, 64, proto, 0, # checksum (don't care) ) + s + d + payload def _tcp(sport: int, dport: int, flags: int) -> bytes: # Minimal 20-byte TCP header: sport, dport, seq, ack, off+flags, win, csum, urg return struct.pack(">HHIIBBHHH", sport, dport, 0, 0, 0x50, # data offset = 5 (no options) flags, 0, 0, 0) def _udp(sport: int, dport: int, length: int = 8) -> bytes: return struct.pack(">HHHH", sport, dport, length, 0) def _ether(payload: bytes, ethertype: int = 0x0800) -> bytes: return b"\x02\x00\x00\x00\x00\x01" + b"\x02\x00\x00\x00\x00\x02" + struct.pack(">H", ethertype) + payload def _record(ts_ns: int, frame: bytes) -> bytes: sec = ts_ns // 1_000_000_000 usec = (ts_ns // 1000) % 1_000_000 return struct.pack(" bytes: out = bytearray(_PCAP_GLOBAL_HDR) for ts, frame in records: out += _record(ts, frame) return bytes(out) def _write_pcap(path: Path, records: list[tuple[int, bytes]]) -> None: path.write_bytes(_build_pcap(records)) # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_iter_pcap_reads_records_back(tmp_path: Path) -> None: p = tmp_path / "a.pcap" frame = _ether(_ipv4("10.200.0.1", "10.200.0.10", 6, _tcp(40000, 21, flags=0x02))) _write_pcap(p, [(1_000_000_000, frame)]) records = list(pcap._iter_pcap(p)) assert len(records) == 1 t_ns, data = records[0] assert t_ns == 1_000_000_000 assert data == frame def test_decode_tcp_syn() -> None: f = _ether(_ipv4("10.200.0.1", "10.200.0.10", 6, _tcp(40000, 21, flags=0x02))) d = pcap._decode(f) assert d["ethertype"] == 0x0800 assert d["ip_proto"] == 6 assert d["src_ip"] == "10.200.0.1" assert d["dst_ip"] == "10.200.0.10" assert d["src_port"] == 40000 assert d["dst_port"] == 21 assert d["tcp_flags"] & 0x02 def test_decode_udp_dns_query() -> None: f = _ether(_ipv4("10.200.0.10", "10.200.0.1", 17, _udp(33333, 53))) d = pcap._decode(f) assert d["ip_proto"] == 17 assert d["dst_port"] == 53 def test_bucketize_collapses_per_window(tmp_path: Path) -> None: pcap_path = tmp_path / "ep.pcap" netflow_path = tmp_path / "netflow.jsonl" bridge_ip = "10.200.0.1" guest_ip = "10.200.0.10" base_ns = 1_700_000_000_000_000_000 # arbitrary, aligned-friendly records = [ # Bucket A (0..100ms) (base_ns + 5_000_000, _ether(_ipv4(guest_ip, bridge_ip, 6, _tcp(40000, 21, flags=0x02)))), (base_ns + 9_000_000, _ether(_ipv4(bridge_ip, guest_ip, 6, _tcp(21, 40000, flags=0x12)))), # Bucket B (100..200ms): UDP DNS query (base_ns + 105_000_000, _ether(_ipv4(guest_ip, bridge_ip, 17, _udp(33333, 53)))), # Bucket B: TCP RST (base_ns + 199_000_000, _ether(_ipv4(bridge_ip, guest_ip, 6, _tcp(21, 40000, flags=0x04)))), ] _write_pcap(pcap_path, records) rows_written = pcap.bucketize( pcap_path, netflow_path, bucket_ms=100, t_mono_origin_ns=base_ns, bridge_ip=bridge_ip, ) assert rows_written == 2 rows = [json.loads(l) for l in netflow_path.read_text().splitlines()] a, b = rows assert a["bucket_ms"] == 100 # Bucket A: 1 in (SYN), 1 out (SYN-ACK) assert a["pkts_in"] == 1 assert a["pkts_out"] == 1 assert a["syn_count"] == 2 assert a["tcp_new_flows"] == 1 # only the bare SYN counts as new flow assert a["dns_query_count"] == 0 assert a["unique_dst_ips"] == 2 # Bucket B: DNS + RST assert b["dns_query_count"] == 1 assert b["rst_count"] == 1 def test_bucketize_returns_zero_for_missing_file(tmp_path: Path) -> None: rows = pcap.bucketize( tmp_path / "nope.pcap", tmp_path / "netflow.jsonl", bucket_ms=100, t_mono_origin_ns=0, ) assert rows == 0 def test_bucketize_handles_unknown_ethertype(tmp_path: Path) -> None: p = tmp_path / "x.pcap" netflow = tmp_path / "n.jsonl" # ARP frame (ethertype 0x0806) — counted but not decoded. f = _ether(b"\x00" * 28, ethertype=0x0806) _write_pcap(p, [(1_000_000_000, f)]) rows = pcap.bucketize(p, netflow, bucket_ms=100, t_mono_origin_ns=0) assert rows == 1 out = json.loads(netflow.read_text().splitlines()[0]) # No IP info, but byte/packet count survives. assert out["pkts_in"] + out["pkts_out"] == 1 assert out["tcp_count"] == 0