Wraps the gaps surfaced in the "what is not implemented" audit so the
fleet really is shippable end-to-end. Verified live on the Pi:
- cis490-shipper --ping → HTTP 200 through Caddy + mTLS via the
new wg-pki client CA leaf
- real episode dir → tar+zstd → PUT → HTTP 201 stored
- re-ship same bytes → 200 (idempotent)
- re-ship different bytes under same id → 409 (conflict)
Changes:
orchestrator/episode.py
- EpisodeConfig.revert_at_start / revert_at_end (Tier 0+ snapshot/
revert per docs/architecture.md). When set + qmp_socket present,
EpisodeRunner issues loadvm <snapshot_name> and emits
snapshot_revert / snapshot_revert_failed events on the same
monotonic clock as everything else.
collectors/qmp.py
- savevm() / loadvm() helpers using human-monitor-command, plus a
test against the fake QMP server.
exploits/workloads.py
- chunked_real_binary_upload() returns a ChunkedUpload plan: 8 KiB
base64 chunks (~6 KiB binary each) so msfrpc never sees a buffer-
busting payload. Includes a finalize step that sha256-verifies on
the guest before exec.
- real_binary_workload() now wraps the chunked plan for backwards
compat with single-shot callers.
exploits/driver.py
- Tier-4 dispatch walks the chunked plan in MSFExploitDriver:
each chunk is a separate session_shell_write; finalize verifies;
exec only runs on sha-ok. New events: real_binary_upload_begin,
real_binary_verify, real_binary_aborted.
etc/cis490-orchestrator.service
- Reads /etc/cis490/lab-host.env (FLEET_HOST_ID + optional BRIDGE).
- Grants AmbientCapabilities CAP_NET_RAW (tcpdump for source 4) +
CAP_SYS_ADMIN + CAP_PERFMON (perf for source 3) so collectors
work under hardening.
scripts/install-lab-host.sh
- Writes /etc/cis490/lab-host.env on first install with FLEET_HOST_ID
defaulting to `hostname -s`.
- Best-effort: fetches the Alpine baseline qcow2 (sha512-pinned) and
builds cidata.iso with the in-guest agent embedded; symlinks both
into /opt/cis490/vm/images/ so launchers find them.
scripts/fetch-alpine-baseline.sh
- Idempotent fetcher for the Alpine 3.21 cloud-init nocloud qcow2
matching the sha512 in docs/sources.md.
tools/plot_envelope.py
- Rebuilt to render whatever telemetry the episode dir contains:
proc → QMP block ops → perf IPC/miss-rate → bridge pkts/SYNs →
guest agent load/mem. Missing sources are silently skipped.
tools/index_reader.py
- cis490-index CLI: filter receiver's index.jsonl by host / sample
/ time range, sort, count-by group. Closest thing to a query
interface until we stand up Postgres/Timescale.
samples/README.md
- Rewritten to match the new manifest schema, the kind=real vs mimic
split, the per-(host, slot, ep) selection mechanic, and the
chunked-upload safety story.
Tests: 106 pass (was 102). New cases:
- test_qmp.py — savevm + loadvm (HMP wrapper + error path)
- test_tier4.py — chunked plan splitting, sha-pinned finalize,
end-to-end driver walks all chunks + verify + exec via the fake
msfrpc client
Closes the "what is not implemented" punch list.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
258 lines
9.4 KiB
Python
258 lines
9.4 KiB
Python
"""Tests for the Tier-4 path:
|
|
- real_binary_workload constructs valid shell commands
|
|
- Sample.binary_path resolves correctly
|
|
- MSFExploitDriver.real-sample dispatch picks the upload+exec path
|
|
when a binary is staged, mimic when it isn't
|
|
- tools/fetch_sample input validation (we don't hit the live API)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from exploits.driver import DriverConfig, MSFExploitDriver
|
|
from exploits.modules import load_module_config
|
|
from exploits.workloads import (
|
|
chunked_real_binary_upload, real_binary_workload,
|
|
)
|
|
from samples.manifest import Sample
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
MODULES_DIR = REPO_ROOT / "exploits" / "modules"
|
|
|
|
|
|
# Reuse the FakeMSFRpcClient from test_exploits.py.
|
|
from tests.test_exploits import FakeMSFRpcClient # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# real_binary_workload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_real_binary_workload_embeds_base64() -> None:
|
|
payload = b"\x7fELF" + b"\x00" * 64 # tiny ELF-shaped header
|
|
w = real_binary_workload(payload)
|
|
# Start command bundles a chunked upload (printf '%s' '<b64>' >> file).
|
|
# Pull all b64 segments out and confirm they round-trip.
|
|
import base64 as _b64
|
|
import re
|
|
matches = re.findall(r"printf '%s' '([A-Za-z0-9+/=]+)'", w.start_cmd)
|
|
assert matches, "expected printf-based b64 chunks in start_cmd"
|
|
decoded = _b64.b64decode("".join(matches))
|
|
assert decoded == payload
|
|
|
|
|
|
def test_chunked_real_binary_upload_splits_correctly() -> None:
|
|
"""A binary larger than the chunk size should produce >1 chunks
|
|
plus a finalize + exec. Each chunk's payload must be individually
|
|
valid base64 and the concatenation must round-trip."""
|
|
import base64 as _b64
|
|
import hashlib as _hashlib
|
|
import re
|
|
|
|
# Build a payload large enough to force multiple chunks.
|
|
payload = (b"\x90\xab" * 8000)
|
|
plan = chunked_real_binary_upload(payload)
|
|
assert plan.n_chunks >= 3 # 1 init + 2+ data chunks
|
|
assert plan.expected_sha256 == _hashlib.sha256(payload).hexdigest()
|
|
|
|
# Reconstruct from chunks.
|
|
segs = []
|
|
for c in plan.chunks:
|
|
m = re.search(r"printf '%s' '([A-Za-z0-9+/=]+)'", c)
|
|
if m:
|
|
segs.append(m.group(1))
|
|
assert segs, "no data chunks parsed"
|
|
decoded = _b64.b64decode("".join(segs))
|
|
assert decoded == payload
|
|
|
|
# finalize_cmd verifies the sha256 we computed.
|
|
assert plan.expected_sha256 in plan.finalize_cmd
|
|
assert "sha256sum" in plan.finalize_cmd
|
|
|
|
|
|
def test_real_binary_workload_stop_kills_pidfile() -> None:
|
|
w = real_binary_workload(b"x" * 16)
|
|
assert "kill" in w.stop_cmd
|
|
assert ".cis490-real" in w.stop_cmd
|
|
|
|
|
|
def test_real_binary_workload_per_profile_isolation() -> None:
|
|
a = real_binary_workload(b"\x00", sample=Sample(name="a", family="A", category="rat", profile="cpu-saturate"))
|
|
b = real_binary_workload(b"\x00", sample=Sample(name="b", family="B", category="rat", profile="bursty-c2"))
|
|
# Different profiles → different /tmp paths so concurrent samples
|
|
# don't stomp each other in the same guest.
|
|
assert a.profile != b.profile
|
|
assert a.start_cmd != b.start_cmd
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sample.binary_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_binary_path_resolves_when_staged(tmp_path: Path) -> None:
|
|
sha = "a" * 64
|
|
(tmp_path / sha).write_bytes(b"hello")
|
|
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256=sha)
|
|
assert s.binary_path(tmp_path) == tmp_path / sha
|
|
|
|
|
|
def test_binary_path_none_when_missing(tmp_path: Path) -> None:
|
|
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate", sha256="b" * 64)
|
|
assert s.binary_path(tmp_path) is None
|
|
|
|
|
|
def test_binary_path_none_for_mimic_sample(tmp_path: Path) -> None:
|
|
s = Sample(name="x", family="X", category="rat", profile="cpu-saturate")
|
|
assert s.binary_path(tmp_path) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Driver dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_driver_picks_real_binary_when_staged(tmp_path: Path) -> None:
|
|
payload = b"\x7fELF\x02" + b"\x00" * 60
|
|
sha = hashlib.sha256(payload).hexdigest()
|
|
(tmp_path / sha).write_bytes(payload)
|
|
|
|
sample = Sample(
|
|
name="real-x", family="X", category="rat",
|
|
profile="cpu-saturate", sha256=sha,
|
|
)
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(
|
|
target_ip="10.200.0.10",
|
|
session_open_timeout_s=0.5,
|
|
sample_store_root=tmp_path,
|
|
),
|
|
emit_event=lambda *a, **kw: None,
|
|
sample=sample,
|
|
)
|
|
# Driver picks the chunked-upload path.
|
|
assert driver.workload is not None
|
|
assert driver.workload.profile.startswith("real:")
|
|
assert driver._chunked is not None
|
|
assert driver._chunked.expected_sha256 == sha
|
|
|
|
|
|
def test_driver_walks_chunked_upload_in_session(tmp_path: Path) -> None:
|
|
"""End-to-end: at infected_running, the driver should issue every
|
|
chunk + finalize + exec as separate shell_write calls. The fake
|
|
client records them in order so we can verify."""
|
|
payload = b"\xde\xad\xbe\xef" * 4096 # 16 KiB → multiple chunks
|
|
sha = hashlib.sha256(payload).hexdigest()
|
|
(tmp_path / sha).write_bytes(payload)
|
|
|
|
sample = Sample(
|
|
name="real-multi", family="X", category="rat",
|
|
profile="bursty-c2", sha256=sha,
|
|
)
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
|
|
# Patch the fake to return "sha-ok" so the verify step passes.
|
|
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
|
|
client._verify_response = "sha-ok\n"
|
|
real_read = client.session_shell_read
|
|
def shell_read_with_verify(sid):
|
|
# Return verify token after the finalize command — i.e. once
|
|
# the most recent shell_write contained "sha256sum".
|
|
last = client.shell_writes[-1][1] if client.shell_writes else ""
|
|
if "sha256sum" in last:
|
|
return "sha-ok\n"
|
|
return real_read(sid)
|
|
client.session_shell_read = shell_read_with_verify # type: ignore[assignment]
|
|
|
|
events: list[tuple[str, dict]] = []
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(
|
|
target_ip="10.200.0.10",
|
|
session_open_timeout_s=0.5,
|
|
sample_store_root=tmp_path,
|
|
),
|
|
emit_event=lambda ev, **kw: events.append((ev, kw)),
|
|
sample=sample,
|
|
)
|
|
driver.setup()
|
|
driver.set_phase("armed")
|
|
driver.set_phase("infecting")
|
|
driver.set_phase("infected_running")
|
|
|
|
# All chunks + finalize + exec went through shell_write.
|
|
writes = [w for (_, w) in client.shell_writes]
|
|
n_printf = sum(1 for w in writes if w.startswith("printf '%s'"))
|
|
n_finalize = sum(1 for w in writes if "sha256sum" in w)
|
|
n_exec = sum(1 for w in writes if "nohup" in w and ".cis490-real" in w)
|
|
assert n_printf >= 2, f"expected multiple chunks, saw {n_printf}"
|
|
assert n_finalize == 1
|
|
assert n_exec == 1
|
|
|
|
# Events tell the same story.
|
|
names = [e for (e, _) in events]
|
|
assert "real_binary_upload_begin" in names
|
|
assert "real_binary_verify" in names
|
|
assert any(e == "sample_executed" and kw.get("kind") == "real"
|
|
for (e, kw) in events)
|
|
|
|
|
|
def test_driver_falls_back_to_mimic_when_real_binary_missing(tmp_path: Path) -> None:
|
|
sample = Sample(
|
|
name="real-but-missing", family="X", category="rat",
|
|
profile="bursty-c2", sha256="c" * 64,
|
|
)
|
|
cfg = load_module_config(MODULES_DIR / "vsftpd_234_backdoor.toml")
|
|
client = FakeMSFRpcClient(sessions_after_fire={1: {"type": "shell"}})
|
|
driver = MSFExploitDriver(
|
|
client=client, # type: ignore[arg-type]
|
|
module=cfg,
|
|
cfg=DriverConfig(
|
|
target_ip="10.200.0.10",
|
|
session_open_timeout_s=0.5,
|
|
sample_store_root=tmp_path, # empty
|
|
),
|
|
emit_event=lambda *a, **kw: None,
|
|
sample=sample,
|
|
)
|
|
# Mimic workload selected because the binary isn't staged.
|
|
assert driver.workload is not None
|
|
assert driver.workload.profile == "bursty-c2"
|
|
assert "real:" not in driver.workload.profile
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetcher input validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_fetch_sample_rejects_bad_sha(tmp_path: Path) -> None:
|
|
from tools.fetch_sample import fetch_sample
|
|
|
|
with pytest.raises(ValueError, match="64 hex chars"):
|
|
fetch_sample("not-a-hash", tmp_path, api_key="x")
|
|
|
|
|
|
def test_fetch_sample_returns_existing_when_hash_matches(tmp_path: Path) -> None:
|
|
from tools.fetch_sample import fetch_sample
|
|
|
|
payload = b"already staged bytes"
|
|
sha = hashlib.sha256(payload).hexdigest()
|
|
p = tmp_path / sha
|
|
p.write_bytes(payload)
|
|
# api_key is unused on the cached path; pass anything.
|
|
out = fetch_sample(sha, tmp_path, api_key="ignored")
|
|
assert out == p
|
|
# File untouched.
|
|
assert p.read_bytes() == payload
|