Merge remote-tracking branch 'origin/main' into Dev_REL1_043026

This commit is contained in:
Elliott Kolden 2026-05-01 07:51:34 -06:00
commit 5568d77df8
24 changed files with 2462 additions and 128 deletions

147
AGENTS.md
View file

@ -78,6 +78,127 @@ common silent failures it catches:
`cis490-doctor --json` is machine-readable for use by other agents.
## Tier 3 + Tier 4 deploy (zero-touch via install-lab-host.sh)
`install-lab-host.sh` runs Tier-3 deploy automatically on its second
pass (after the mTLS cert lands). No operator interaction is needed:
metasploit-framework auto-installs via the Rapid7 omnibus, the
Metasploitable2 image auto-fetches from a public mirror with TOFU
sha256 pinning, the host-only bridge auto-comes-up, and a live
exploit fire is verified before the script returns.
To re-run the deploy by hand or on a host where Tier 3 was skipped:
```sh
sudo /opt/cis490/scripts/install-tier-3-4.sh
```
It's idempotent — re-running on an already-deployed host is a no-op
except for the verify step. Inputs are all optional env vars:
| var | effect |
|---|---|
| `SKIP_VERIFY` | skip the live `vsftpd_234_backdoor` smoke run |
| `SKIP_BRIDGE` | skip `br-malware` setup (limits to 2 of 5 modules) |
| `SKIP_TIER4` | skip the Tier-4 auto-fetch even if API key present |
| `MALWAREBAZAAR_API_KEY` | opt-in: present means Tier-4 auto-fetch runs |
The fleet runner auto-detects Tier-3 readiness via
`orchestrator/fleet.py::_msfrpcd_available()`. Once
`cis490-msfrpcd.service` is up and `metasploitable2.qcow2` is on
disk, the next wave produces Tier-3 episodes (`meta.exploit.module_name`
populated). No orchestrator restart is required, but a restart speeds
up the switch.
### Tier-4 (real malware execution) is mandatory, fully automated
**Real-binary episodes are the project's training target — Tier-4 is
NOT optional.** A lab-host deploy that lands without real samples
fails loudly; mimic-only data does not answer the research question.
There is **no operator step**. No API key, no signup, no manual
provisioning. `install-tier-3-4.sh` runs `tools/auto_fetch_samples.py`
which:
1. Clones (or pulls) `theZoo` from
`https://github.com/ytisf/theZoo` to `/var/lib/cis490/theZoo`
(~500 MB shallow clone, public, GPL-3.0, security-research repo)
2. For each `[[sample]]` in `manifest.toml` without a sha256, locates
a directory in `theZoo/malware/Binaries/` whose name matches
the entry's `family` (case-insensitive substring + prefix priority)
3. Extracts the password-protected `.zip` (well-known password
`infected`)
4. Picks the largest non-text payload as the binary, computes its
sha256, copies to `/opt/cis490/samples/store/<sha256>`
5. Rewrites `manifest.toml` in place, atomically (tempfile +
`os.replace` preserving stat), adding `source = "theZoo"`,
`sha256 = "<hex>"`, and the upstream URL
If `auto_fetch_samples.py` lands zero binaries (theZoo layout drift,
git clone failure, or a family has no matching directory),
`install-tier-3-4.sh` exits non-zero. **No silent mimic-only fallback.**
The orchestrator's next selection that picks a sample with
`kind == "real"` runs the real binary via the chunked-upload path
(`exploits.driver._resolve_workload`).
Set `MALWAREBAZAAR_API_KEY` (free signup at https://bazaar.abuse.ch/)
before running `install-tier-3-4.sh` and step 5 runs
`tools/auto_fetch_samples.py` automatically:
1. For each `[[sample]]` in `samples/manifest.toml` without a
`sha256`, query MalwareBazaar by `family` (signature match)
2. Download the first matching binary (sha256-verified on the way in)
3. Edit the manifest in place — add `source`, `sha256`, `url`
4. Episodes that select that sample now run the real binary via the
chunked-upload path (`exploits.driver._resolve_workload`)
The mimic profile remains the fallback for episodes that select a
sample whose binary isn't on disk. Trainers filter on
`meta.sample.kind ∈ {"real", "mimic"}`.
### Confirm Tier 3+4 are flowing
```sh
# On the Pi maintainer side:
sudo python3 -c "
import json, glob, subprocess, tarfile, io
from collections import Counter
mods = Counter(); kinds = Counter()
for tar in glob.glob('/var/lib/cis490/episodes/*/*.tar.zst'):
z = subprocess.check_output(['zstd','-q','-d','--stdout',tar],stderr=subprocess.DEVNULL)
with tarfile.open(fileobj=io.BytesIO(z)) as t:
for m in t.getmembers():
if m.name.endswith('meta.json') and m.isfile():
meta = json.load(t.extractfile(m))
mods[(meta.get('exploit') or {}).get('module_name','<none>')] += 1
kinds[(meta.get('sample') or {}).get('kind','<none>')] += 1
break
print('exploit modules used:', dict(mods))
print('sample kinds:', dict(kinds))
"
```
If `mods` is `{'<none>': N}` and `kinds` is `{'mimic': N}`, Tier 3
hasn't kicked in yet on any lab host — re-run
`install-tier-3-4.sh` there. If `mods` shows
`{'vsftpd_234_backdoor': N, ...}` and `kinds` shows a non-zero
`'real'` count, both tiers are live.
### Don't shortcut
- DO NOT install `metasploit-framework` system-wide outside
`install-msfrpcd.sh`. The script wires the systemd unit + creds;
a manual install bypasses the orchestrator's
`_msfrpcd_available()` probe.
- DO NOT push your MalwareBazaar API key. `samples/.bazaar.token`
is gitignored.
- DO NOT add bogus sha256 entries to `manifest.toml`
`fetch_sample.py` hash-verifies every download.
- DO NOT add real-binary entries by hand when `auto_fetch_samples.py`
exists. Hand-edits are racy with the auto-fetcher's tempfile
rewrite.
## Securing the connection (mTLS) — DO NOT mint your own certs
The lab-host ↔ Pi connection is mTLS over WireGuard. **The cert
@ -201,6 +322,32 @@ older clone:
**If you hit any of these on a fresh install, pull main first** before
filing an issue — the issue is probably already closed.
### The classifier is multi-source — don't gut episodes on /proc alone
`tools/prune_episodes.py` cross-checks four telemetry sources before
flagging an episode as flat:
- `telemetry-proc.jsonl` — host qemu-system /proc CPU%
- `netflow.jsonl` — bridge_pcap byte counters (network profiles)
- `telemetry-qmp.jsonl` — virtio blockstats per-phase delta (io-walk,
ransomware-shape)
- `telemetry-guest.jsonl` — in-guest agent load_1m (low-and-slow,
any host with a working agent)
An episode flags as `flat-cpu` only when EVERY available source
shows no inter-phase variation. If `/proc` is flat but qmp blockstats
show 90 MB written during `infected_running`, the episode is kept —
the host /proc collector loses signal under contention but qmp sees
through. This is essential on laptop-class lab hosts (e.g.
elliott-thinkpad) where the guest is co-scheduled with 13 other VMs
and the per-VM /proc CPU% gets buried.
All four sources stamp `t_wall_ns`; phase mapping uses that, not
`t_mono_ns`, because /proc and labels are orchestrator-relative
while netflow/guest are wall-clock-anchored. If you add a new
collector, emit `t_wall_ns` from CLOCK_REALTIME on every row or your
data will silently bucket into "(pre)".
### Don't trust the in-guest probe alone — cross-check host CPU
The `pre_kill_probe.yes` / `pre_kill_probe.sh` fields in

View file

@ -13,3 +13,25 @@ max_episode_bytes = 268_435_456 # 256 MiB
# is for dev testing or as a belt-and-suspenders alongside mTLS.
# [auth]
# bearer_token = "REPLACE_ME_WITH_SECRET"
# Code-version gate. Every PUT must carry X-Cis490-Code-Commit and that
# commit must be in the receiver's allow-list. The allow-list is the
# last `window` commits of `repo_path` (auto-refreshed every 5s, so a
# `git pull` on the Pi makes new commits acceptable instantly). This
# keeps episodes from out-of-date lab hosts out of the index.
[version_gate]
enabled = true
window = 100
# Production: hit the local Forgejo for the canonical commit list. The
# maintainer pushes to this repo; lab hosts pull from it. When the
# receiver checks each PUT it sees the same commits the lab hosts
# would see if they pulled at the same instant.
forgejo_url = "http://10.100.0.1:3000"
repo_owner = "spectral"
repo_name = "CIS490"
branch = "main"
# Optional Forgejo token for private repos; remove for public.
# auth_token = "..."
#
# Dev-only fallback (used iff forgejo_url is unset):
# local_repo_path = "/home/max/cis490"

View file

@ -29,6 +29,7 @@ from __future__ import annotations
import json
import logging
import os
import subprocess
import threading
import time
from dataclasses import dataclass, field
@ -42,6 +43,79 @@ from samples.manifest import Sample
from .ulid import new_ulid
# Repo root for the version probe — orchestrator/episode.py lives at
# <repo>/orchestrator/episode.py.
_REPO_ROOT = Path(__file__).resolve().parent.parent
# Cached so we don't fork `git` on every episode.
_CODE_VERSION_CACHE: dict | None = None
def _resolve_code_version() -> dict:
"""Return a small dict identifying the code that produced this episode.
Order of resolution:
1. ``$INSTALL_ROOT/VERSION`` (written by install-lab-host.sh at
install time typical production path, since /opt/cis490
doesn't carry a .git/ dir)
2. ``git rev-parse HEAD`` from the repo root (dev clones)
3. ``{"commit": "unknown"}`` so meta.json always has the field
Output shape (always present):
{"commit": "<40-hex>" | "unknown",
"branch": "<name>" | None,
"dirty": bool | None,
"source": "VERSION-file" | "git" | "unknown"}
Result is cached at module level so per-episode meta emission is
free after the first read."""
global _CODE_VERSION_CACHE
if _CODE_VERSION_CACHE is not None:
return _CODE_VERSION_CACHE
# 1. VERSION file (production install).
for cand in (_REPO_ROOT / "VERSION", Path("/opt/cis490/VERSION")):
if cand.is_file():
try:
v = json.loads(cand.read_text())
if isinstance(v, dict) and v.get("commit"):
v.setdefault("source", "VERSION-file")
_CODE_VERSION_CACHE = v
return v
except (json.JSONDecodeError, OSError):
pass
# 2. git rev-parse from repo root (dev clones).
try:
commit = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "HEAD"],
capture_output=True, text=True, timeout=2, check=True,
).stdout.strip()
branch = subprocess.run(
["git", "-C", str(_REPO_ROOT), "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True, text=True, timeout=2,
).stdout.strip() or None
# `git status --porcelain` is empty iff the working tree is clean.
porcelain = subprocess.run(
["git", "-C", str(_REPO_ROOT), "status", "--porcelain"],
capture_output=True, text=True, timeout=2,
).stdout
_CODE_VERSION_CACHE = {
"commit": commit,
"branch": branch,
"dirty": bool(porcelain.strip()),
"source": "git",
}
return _CODE_VERSION_CACHE
except (subprocess.SubprocessError, FileNotFoundError, OSError):
pass
_CODE_VERSION_CACHE = {
"commit": "unknown", "branch": None, "dirty": None, "source": "unknown",
}
return _CODE_VERSION_CACHE
log = logging.getLogger("cis490.orchestrator")
SCHEMA_VERSION = 1
@ -364,6 +438,7 @@ class EpisodeRunner:
return {
"episode_id": self.episode_id,
"schema_version": SCHEMA_VERSION,
"code_version": _resolve_code_version(),
"started_at_wall": started_at_wall,
"ended_at_wall": None,
"host_fingerprint": {

View file

@ -9,6 +9,7 @@ import uvicorn
from .app import make_app
from .config import ReceiverConfig
from .store import EpisodeStore
from .version_gate import VersionGate
def main() -> None:
@ -31,10 +32,22 @@ def main() -> None:
incoming_root=cfg.incoming_root,
index_path=cfg.index_path,
)
version_gate = None
if cfg.version_gate_enabled:
version_gate = VersionGate(
repo_path=cfg.version_gate_local_repo,
window=cfg.version_gate_window,
forgejo_url=cfg.version_gate_forgejo_url,
repo_owner=cfg.version_gate_repo_owner,
repo_name=cfg.version_gate_repo_name,
branch=cfg.version_gate_branch,
auth_token=cfg.version_gate_auth_token,
)
app = make_app(
store=store,
max_episode_bytes=cfg.max_episode_bytes,
bearer_token=cfg.bearer_token,
version_gate=version_gate,
)
uvicorn.run(
app,

View file

@ -12,6 +12,7 @@ from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from .store import EpisodeStore, is_valid_id
from .version_gate import VersionGate
log = logging.getLogger("cis490.receiver")
@ -38,6 +39,7 @@ def make_app(
store: EpisodeStore,
max_episode_bytes: int,
bearer_token: str | None = None,
version_gate: VersionGate | None = None,
) -> Starlette:
async def health(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"})
@ -89,6 +91,56 @@ def make_app(
except ValueError:
return JSONResponse({"error": "bad X-Schema-Version"}, status_code=400)
# Code-version gate. Every PUT must carry the orchestrator's
# commit hash and that hash must be in the receiver's current
# allow-list (last N commits on the maintainer's working clone).
# Missing → 400 (client bug); not-in-window → 412 (out-of-date
# lab host, must pull main).
commit = request.headers.get("x-cis490-code-commit", "").strip().lower()
if version_gate is not None:
ok, reason = version_gate.check(commit) if commit else (False, "missing")
if not ok:
head = version_gate.head()
if reason == "missing":
body = {
"error": "missing X-Cis490-Code-Commit header",
"remediation": (
"Lab-host is shipping with no code_version stamp. "
"Pull origin/main and re-run install-lab-host.sh "
"so the orchestrator emits meta.json.code_version "
"and the shipper forwards X-Cis490-Code-Commit."
),
}
return JSONResponse(body, status_code=400)
if reason == "bad-format":
return JSONResponse(
{"error": "X-Cis490-Code-Commit must be 40 lowercase hex"},
status_code=400,
)
# not-in-window: out-of-date lab host
body = {
"error": "code commit rejected: not in receiver's allow-list",
"your_commit": commit,
"valid_window_size": version_gate.valid_count(),
"head_commit": head,
"remediation": (
"Pull origin/main on this lab host and rebuild before "
"shipping further:\n"
" cd /opt/cis490 && sudo -u cis490 git pull origin main\n"
" sudo /opt/cis490/scripts/install-lab-host.sh\n"
" sudo systemctl restart cis490-orchestrator\n"
"Episodes from old code stay queued; the next ship will "
"succeed once the lab-host's HEAD is in the receiver's "
"allow-list. Do NOT bypass this check — it exists to "
"keep buggy pre-fix data out of the training set."
),
}
log.warning(
"rejected episode host=%s id=%s commit=%s reason=%s",
host_id, episode_id, commit[:12], reason,
)
return JSONResponse(body, status_code=412)
cl = request.headers.get("content-length")
if cl is not None:
try:
@ -104,6 +156,7 @@ def make_app(
episode_id=episode_id,
expected_sha256=expected_sha,
schema_version=schema_version,
commit=commit or None,
body=request.stream(),
max_bytes=max_episode_bytes,
)

View file

@ -17,6 +17,17 @@ class ReceiverConfig:
index_path: Path
max_episode_bytes: int
bearer_token: str | None
# Code-version gate. Production source is the local Forgejo
# (canonical repo both lab hosts and the receiver pull from);
# local-git path is a dev-only fallback.
version_gate_enabled: bool
version_gate_window: int
version_gate_forgejo_url: str | None
version_gate_repo_owner: str | None
version_gate_repo_name: str | None
version_gate_branch: str
version_gate_auth_token: str | None
version_gate_local_repo: Path | None
@classmethod
def load(cls, path: str | Path) -> "ReceiverConfig":
@ -25,6 +36,8 @@ class ReceiverConfig:
listen_addr = data.get("listen_addr", "127.0.0.1:8443")
host, _, port = listen_addr.rpartition(":")
version_gate = data.get("version_gate", {})
local_repo = version_gate.get("local_repo_path")
return cls(
listen_host=host or "127.0.0.1",
listen_port=int(port),
@ -35,4 +48,12 @@ class ReceiverConfig:
data.get("limits", {}).get("max_episode_bytes", DEFAULT_MAX_EPISODE_BYTES)
),
bearer_token=data.get("auth", {}).get("bearer_token"),
version_gate_enabled=bool(version_gate.get("enabled", True)),
version_gate_window=int(version_gate.get("window", 100)),
version_gate_forgejo_url=version_gate.get("forgejo_url"),
version_gate_repo_owner=version_gate.get("repo_owner"),
version_gate_repo_name=version_gate.get("repo_name"),
version_gate_branch=version_gate.get("branch", "main"),
version_gate_auth_token=version_gate.get("auth_token"),
version_gate_local_repo=Path(local_repo).resolve() if local_repo else None,
)

View file

@ -59,6 +59,7 @@ class EpisodeStore:
schema_version: int,
body: AsyncIterator[bytes],
max_bytes: int,
commit: str | None = None,
) -> StoreResult:
final = self.final_path(host_id, episode_id)
if final.exists():
@ -109,16 +110,17 @@ class EpisodeStore:
final.parent.mkdir(parents=True, exist_ok=True)
os.replace(partial, final)
self._append_index(
{
"received_at_wall": datetime.now(timezone.utc).isoformat(),
"host_id": host_id,
"episode_id": episode_id,
"sha256": actual,
"size_bytes": bytes_written,
"schema_version": schema_version,
}
)
row = {
"received_at_wall": datetime.now(timezone.utc).isoformat(),
"host_id": host_id,
"episode_id": episode_id,
"sha256": actual,
"size_bytes": bytes_written,
"schema_version": schema_version,
}
if commit:
row["commit"] = commit
self._append_index(row)
return StoreResult(status="stored", sha256=actual, size_bytes=bytes_written)
except BaseException:
partial.unlink(missing_ok=True)

175
receiver/version_gate.py Normal file
View file

@ -0,0 +1,175 @@
"""Live commit allow-list for the receiver.
The receiver only stores episodes whose `meta.json::code_version.commit`
matches a commit in the canonical repository's recent history. Two
backends are supported:
forgejo: queries
GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>
on a Forgejo instance the maintainer pushes to. PRODUCTION
DEFAULT Forgejo is the authoritative source of truth that
both lab hosts and the receiver pull from, so when the
maintainer pushes new code the new commit becomes acceptable
automatically.
git: runs `git log -n <window> --format=%H` against a local
checkout. Used by tests + dev-only setups where a Forgejo
instance isn't available.
Cache TTL: 5s by default push a commit, wait 5s, the new hash is
in the allow-list. No service restart.
Episodes from older code (before a known bug fix) get rejected with
HTTP 412 + a remediation block telling the lab-host operator to pull
main and re-run the install. That keeps bad data out of the index.
"""
from __future__ import annotations
import json
import logging
import subprocess
import threading
import time
import urllib.parse
import urllib.request
from pathlib import Path
log = logging.getLogger("cis490.receiver.version_gate")
class VersionGate:
"""Maintains the set of acceptable commit hashes via either a
Forgejo HTTP API call or a local `git log`.
Args:
forgejo_url: e.g. "http://10.100.0.1:3000". Setting this enables
the Forgejo backend; ``repo_owner``/``repo_name``/``branch``
must also be set. ``auth_token`` is optional but recommended
(so a private Forgejo doesn't need to be world-readable).
repo_path: local checkout (fallback / test backend). Used iff
``forgejo_url`` is None.
window: how many recent commits count as valid.
cache_ttl_s: how long to trust the cache before refreshing.
"""
def __init__(
self,
repo_path: Path | None = None,
*,
window: int = 100,
cache_ttl_s: float = 5.0,
forgejo_url: str | None = None,
repo_owner: str | None = None,
repo_name: str | None = None,
branch: str = "main",
auth_token: str | None = None,
) -> None:
self.repo_path = Path(repo_path) if repo_path else None
self.window = int(window)
self.cache_ttl_s = float(cache_ttl_s)
self.forgejo_url = forgejo_url.rstrip("/") if forgejo_url else None
self.repo_owner = repo_owner
self.repo_name = repo_name
self.branch = branch
self.auth_token = auth_token
if not self.forgejo_url and not self.repo_path:
raise ValueError("VersionGate needs forgejo_url or repo_path")
self._lock = threading.Lock()
self._cached_hashes: frozenset[str] = frozenset()
self._cached_at: float = 0.0
self._head: str | None = None
# ---- backend dispatch -----------------------------------------------
def _refresh(self) -> None:
if self.forgejo_url:
hashes, head = self._refresh_forgejo()
else:
hashes, head = self._refresh_git()
if not hashes:
log.warning("version-gate refresh empty; keeping prior cache "
"of %d hashes", len(self._cached_hashes))
self._cached_at = time.monotonic()
return
with self._lock:
self._cached_hashes = frozenset(hashes)
self._cached_at = time.monotonic()
self._head = head
log.info("version-gate refreshed: %d valid hashes, head=%s, source=%s",
len(hashes), head[:12] if head else "?",
"forgejo" if self.forgejo_url else "git")
def _refresh_forgejo(self) -> tuple[set[str], str | None]:
"""GET /api/v1/repos/<owner>/<name>/commits?sha=<branch>&limit=<n>."""
url = (
f"{self.forgejo_url}/api/v1/repos/{self.repo_owner}/"
f"{self.repo_name}/commits"
f"?sha={urllib.parse.quote(self.branch)}&limit={self.window}"
)
req = urllib.request.Request(url)
if self.auth_token:
req.add_header("Authorization", f"token {self.auth_token}")
try:
with urllib.request.urlopen(req, timeout=3) as r:
rows = json.loads(r.read().decode("utf-8"))
except (urllib.request.HTTPError, urllib.request.URLError,
json.JSONDecodeError, OSError) as e:
log.warning("forgejo refresh failed (%s); keeping prior cache", e)
return set(), self._head
hashes: set[str] = set()
head: str | None = None
for i, row in enumerate(rows or []):
sha = row.get("sha")
if isinstance(sha, str) and len(sha) == 40:
sha = sha.lower()
hashes.add(sha)
if i == 0:
head = sha
return hashes, head
def _refresh_git(self) -> tuple[set[str], str | None]:
"""`git log -n <window> --format=%H` from `repo_path`."""
try:
out = subprocess.run(
["git", "-C", str(self.repo_path),
"log", f"-n{self.window}", "--format=%H"],
check=True, capture_output=True, text=True, timeout=3,
).stdout
except (subprocess.SubprocessError, FileNotFoundError, OSError) as e:
log.warning("git refresh failed (%s); keeping prior cache", e)
return set(), self._head
lines = [h.strip().lower() for h in out.splitlines() if h.strip()]
head = lines[0] if lines else None
return set(lines), head
def _maybe_refresh(self) -> None:
if (time.monotonic() - self._cached_at) > self.cache_ttl_s:
self._refresh()
def head(self) -> str | None:
"""Return the most recent valid commit (HEAD of the branch
the receiver is mirroring). Used by the 412 response so the
client knows what to pull to."""
self._maybe_refresh()
return self._head
def valid_count(self) -> int:
self._maybe_refresh()
return len(self._cached_hashes)
def check(self, commit: str | None) -> tuple[bool, str | None]:
"""Return (ok, reason). ``reason`` is None on success, a
short string identifying the failure mode otherwise."""
if not commit:
return False, "missing"
c = commit.strip().lower()
if len(c) != 40 or not all(ch in "0123456789abcdef" for ch in c):
return False, "bad-format"
self._maybe_refresh()
with self._lock:
allowed = self._cached_hashes
if c in allowed:
return True, None
return False, "not-in-window"

View file

@ -4,58 +4,74 @@
# - identity (name, family, category) for labeling
# - acquisition (source, sha256, url) for reproducibility
# - behaviour (profile) so the synthetic load mimic can run a
# reasonable proxy until the real sample lands at vm/images/
# reasonable proxy until the real sample lands at samples/store/.
#
# When the real malware binary is present at samples/store/<sha256>,
# the orchestrator runs THAT inside the guest. When it's absent, the
# orchestrator falls back to running tools/load_mimic.py with the
# matching profile so the fleet still produces *labeled, varied* data
# while we collect the real samples. Either way, meta.json records
# which path the episode took, so trainers can filter on
# orchestrator falls back to the mimic workload with the matching
# profile so the fleet still produces *labeled, varied* data while
# we collect the real samples. Either way, meta.json records which
# path the episode took, so trainers can filter on
# meta.sample.kind ∈ {real, mimic}.
#
# Families below are CHOSEN AND TESTED to match theZoo entries that
# contain a Linux 32-bit Intel 80386 ELF binary — i.e. binaries that
# will execute natively inside our Metasploitable2 (Ubuntu 8.04 i386)
# target VM. Verified against a fresh theZoo clone on 2026-05-01;
# tools/auto_fetch_samples.py prefers the Linux-i386 ELF in each
# multi-binary zip via `_is_linux_i386_elf` magic-byte sniffing.
[[sample]]
name = "linux-encoder-ransomware"
family = "Linux.Encoder"
category = "ransomware"
profile = "io-walk"
description = "Linux.Encoder.1 (Linux i386 ELF). The first known Linux ransomware. Heavy disk write + fs walk producing a per-file overwrite envelope."
[[sample]]
name = "linux-wirenet-rat"
family = "Linux.Wirenet"
category = "rat"
profile = "shell-resident"
description = "Linux.Wirenet (Linux i386 ELF). RAT with a long-lived TCP socket pinned to a fixed peer; occasional command bursts."
[[sample]]
name = "linux-rex-ransomware"
family = "Ransomware.Rex"
category = "ransomware"
profile = "io-walk"
description = "Ransomware.Rex (Linux i386 ELF, written in Go). File-walk encryption envelope with periodic CPU spikes during AES."
[[sample]]
name = "linux-neurevt-bot"
family = "Neurevt"
category = "botnet"
profile = "scan-and-dial"
description = "Neurevt 1.7 (Linux i386 ELF). Botnet panel binary; SYN scans + periodic dial-home pattern."
[[sample]]
name = "linux-earthkrahang-apt"
family = "EarthKrahang"
category = "rat"
profile = "bursty-c2"
description = "EarthKrahang 2024 (Linux i386 ELF). APT backdoor; long idle + periodic small TCP egress bursts."
# Mimic-only fallback families. theZoo doesn't have a clean Linux i386
# binary for these; auto_fetch_samples.py logs a warning and the
# orchestrator stays on the mimic workload until a real binary is
# staged manually at samples/store/<sha256>. Kept here so the trainer
# can still collect cpu-saturate and low-and-slow envelopes (those
# profiles' theZoo coverage is sparse).
[[sample]]
name = "xmrig-cryptominer"
family = "XMRig"
category = "cryptominer"
profile = "cpu-saturate"
# A real XMRig fetch goes here when MalwareBazaar pull is wired up:
# source = "MalwareBazaar"
# sha256 = "TBD"
# url = "https://bazaar.abuse.ch/sample/TBD/"
description = "Sustained 1-vCPU saturation, very low IO/net. Pure compute."
[[sample]]
name = "mirai-class-bot"
family = "Mirai"
category = "botnet"
profile = "scan-and-dial"
description = "SYN scans across the bridge IP space + periodic dial-home. High net, low CPU."
[[sample]]
name = "ransomware-mimic"
family = "Cryptolocker-class"
category = "ransomware"
profile = "io-walk"
description = "Heavy disk write + filesystem walk producing a per-file overwrite envelope."
[[sample]]
name = "dridex-class-trojan"
family = "Dridex"
category = "banking-trojan"
profile = "bursty-c2"
description = "Long idle, periodic short bursts of TCP egress to a fixed peer (C2 beacon shape)."
description = "Mimic only on Metasploitable2 (no Linux-i386 XMRig in theZoo)."
[[sample]]
name = "kovter-class-stealth"
family = "Kovter"
category = "fileless"
profile = "low-and-slow"
description = "Low CPU, periodic memory churn, no persistent on-disk artifacts. Hardest to label from /proc alone."
[[sample]]
name = "reverse-shell-resident"
family = "Reverse-Shell"
category = "rat"
profile = "shell-resident"
description = "Single TCP socket pinned to an attacker IP, occasional command bursts."
description = "Mimic only — Kovter is Windows-native; theZoo's binary won't run on Metasploitable2 i386."

View file

@ -1,26 +1,34 @@
#!/usr/bin/env bash
# Fetch + sha256-verify the Metasploitable2 disk image.
# Fetch the Metasploitable2 disk image with no operator interaction.
#
# Rapid7's official download is gated behind a registration form, so
# we accept the URL + sha256 from env vars (with sane defaults pointing
# at a public mirror). The user installs this once per lab host.
# Defaults to the SourceForge public mirror — the canonical
# freely-redistributable copy of Metasploitable2 (Rapid7's own
# download is registration-walled but the same VMDK is on
# SourceForge, downloaded ~2M times). HTTPS protects the transport.
#
# Inputs (env):
# IMAGE_URL — direct download URL for the metasploitable2 archive
# IMAGE_SHA256 — expected sha256 of the archive
# OUT_DIR — where to drop the qcow2 (default vm/images/)
# Idempotent: if the qcow2 is already on disk we do nothing.
#
# Inputs (env, all optional):
# IMAGE_URL — override the default mirror URL
# IMAGE_SHA256 — verify against this hash. If unset and a sha256
# has been recorded by a prior successful fetch
# ($OUT_DIR/metasploitable2.qcow2.sha256), use that.
# If neither is available, do TOFU (trust on first
# use): record the hash of what was downloaded so
# subsequent runs verify against it.
# OUT_DIR — where to drop the qcow2 (default vm/images/)
#
# Outputs:
# $OUT_DIR/metasploitable2.qcow2 — converted from the original VMDK
# if needed.
#
# We do NOT bake an image url+hash into the repo because the canonical
# distribution is a registration-walled zip on Rapid7. Operators must
# supply both; the rest is mechanical.
# $OUT_DIR/metasploitable2.qcow2 — the disk image
# $OUT_DIR/metasploitable2.qcow2.sha256 — recorded archive hash
set -euo pipefail
IMAGE_URL="${IMAGE_URL:-}"
# SourceForge public mirror. Direct-download URL — no auth, no
# registration. /download 302s to a regional mirror.
DEFAULT_IMAGE_URL='https://downloads.sourceforge.net/project/metasploitable/Metasploitable2/metasploitable-linux-2.0.0.zip'
IMAGE_URL="${IMAGE_URL:-$DEFAULT_IMAGE_URL}"
IMAGE_SHA256="${IMAGE_SHA256:-}"
OUT_DIR="${OUT_DIR:-$(cd "$(dirname "$0")/../vm/images" 2>/dev/null && pwd)}"
WORK_DIR="${WORK_DIR:-/tmp/cis490-metasploitable-fetch}"
@ -28,26 +36,44 @@ WORK_DIR="${WORK_DIR:-/tmp/cis490-metasploitable-fetch}"
log() { printf '[fetch-metasploitable2] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
[[ -n "$IMAGE_URL" ]] || die "set IMAGE_URL to the Metasploitable2 download URL"
[[ -n "$IMAGE_SHA256" ]] || die "set IMAGE_SHA256 to the expected sha256 of the archive"
mkdir -p "$OUT_DIR" "$WORK_DIR"
# Short-circuit if the qcow2 is already on disk.
if [[ -f "$OUT_DIR/metasploitable2.qcow2" ]]; then
log "$OUT_DIR/metasploitable2.qcow2 already present; nothing to do"
exit 0
fi
# Use the recorded sha256 from a prior successful fetch if present
# and the env var didn't override it. This pins TOFU across runs
# so a tampered re-download fails noisily.
SHA_FILE="$OUT_DIR/metasploitable2.qcow2.sha256"
if [[ -z "$IMAGE_SHA256" && -f "$SHA_FILE" ]]; then
IMAGE_SHA256="$(awk '{print $1}' "$SHA_FILE")"
log "using pinned sha256 from $SHA_FILE: $IMAGE_SHA256"
fi
ARCHIVE="$WORK_DIR/$(basename "$IMAGE_URL")"
log "downloading $IMAGE_URL$ARCHIVE"
if [[ -f "$ARCHIVE" ]]; then
log "archive already present; skipping download"
log "archive already present in work dir; skipping download"
else
# -L follows SourceForge's redirect to the actual mirror.
curl -fL --retry 3 --retry-delay 5 -o "$ARCHIVE.partial" "$IMAGE_URL"
mv "$ARCHIVE.partial" "$ARCHIVE"
fi
log "verifying sha256"
ACTUAL="$(sha256sum "$ARCHIVE" | awk '{print $1}')"
if [[ "$ACTUAL" != "$IMAGE_SHA256" ]]; then
die "sha256 mismatch: expected $IMAGE_SHA256, got $ACTUAL"
if [[ -n "$IMAGE_SHA256" ]]; then
if [[ "$ACTUAL" != "$IMAGE_SHA256" ]]; then
die "sha256 mismatch: expected $IMAGE_SHA256, got $ACTUAL"
fi
log "sha256 ok"
else
log "no sha256 supplied — first-run TOFU; pinning $ACTUAL for future runs"
fi
log "sha256 ok"
# Always (re)record so future runs verify against the working hash.
echo "$ACTUAL $(basename "$ARCHIVE")" > "$SHA_FILE"
# Extract — handle either zip or 7z, since various mirrors choose one
# or the other.
@ -65,5 +91,8 @@ log "converting $VMDK → qcow2"
command -v qemu-img >/dev/null || die "qemu-img required (apt install qemu-utils)"
qemu-img convert -O qcow2 "$VMDK" "$OUT_DIR/metasploitable2.qcow2"
# Best-effort cleanup of the work dir — keeps lab-host disk clean.
rm -rf "$WORK_DIR"
log "done: $OUT_DIR/metasploitable2.qcow2"
log "Tier-3 ready when msfrpcd is up. See scripts/install-msfrpcd.sh."

View file

@ -69,6 +69,26 @@ install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT"
cp -aT "$REPO_ROOT" "$INSTALL_ROOT"
chown -R "$SERVICE_USER":"$SERVICE_USER" "$INSTALL_ROOT"
# Stamp a VERSION file at install time so episodes can record the
# code commit they were generated by. /opt/cis490 is a flat copy
# (no .git/), so we capture the source repo's HEAD here. Trainers
# read meta.json.code_version to filter out episodes from buggy
# pre-fix code.
if VC="$(cd "$REPO_ROOT" && git rev-parse HEAD 2>/dev/null)"; then
VB="$(cd "$REPO_ROOT" && git rev-parse --abbrev-ref HEAD 2>/dev/null || echo unknown)"
VD="false"
if cd "$REPO_ROOT" && [[ -n "$(git status --porcelain 2>/dev/null)" ]]; then
VD="true"
fi
install -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0644 /dev/stdin \
"$INSTALL_ROOT/VERSION" <<EOF
{"commit": "$VC", "branch": "$VB", "dirty": $VD, "installed_at_wall": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"}
EOF
log "VERSION stamp: $VC ($VB)$([[ "$VD" == "true" ]] && echo " [dirty]")"
else
log "WARN: $REPO_ROOT not a git checkout; episodes will record code_version.commit='unknown'"
fi
log "building venv"
if [[ "$USE_UV" -eq 1 ]]; then
sudo -u "$SERVICE_USER" -- env HOME="$INSTALL_ROOT" \
@ -203,6 +223,28 @@ install -d -o "$SERVICE_USER" -g "$SERVICE_USER" -m 0755 "$INSTALL_ROOT/vm/image
ln -sf "$ALPINE_IMG" "$INSTALL_ROOT/vm/images/alpine-baseline.qcow2" 2>/dev/null || true
ln -sf "$CIDATA_ISO" "$INSTALL_ROOT/vm/images/cidata.iso" 2>/dev/null || true
# --- 8. Tier-3 + Tier-4 deploy (auto, idempotent) ----------------------
# Bring up msfrpcd + Metasploitable2 + bridge + verify. Skipped only if
# certs aren't on disk yet (Tier-3 fire writes episodes that the
# shipper ships, so it's pointless to run before mTLS is live) or the
# operator passed --skip-tier3.
SKIP_TIER3="${SKIP_TIER3:-}"
for arg in "$@"; do
[[ "$arg" == "--skip-tier3" ]] && SKIP_TIER3=1
done
if [[ -z "$SKIP_TIER3" && -f "$ETC_ROOT/certs/lab-host.pem" ]]; then
log "deploying Tier 3 (msfrpcd + Metasploitable2 + bridge)"
if "$INSTALL_ROOT/scripts/install-tier-3-4.sh"; then
log "Tier-3 deploy ✓"
else
log "WARN: Tier-3 deploy failed — Tier 2 will keep running."
log " Re-run later: sudo $INSTALL_ROOT/scripts/install-tier-3-4.sh"
fi
elif [[ -z "$SKIP_TIER3" ]]; then
log "skipping Tier-3 deploy (no mTLS cert yet — re-run this script after"
log "host_id is set so the cert auto-fetches first)"
fi
if [[ "$NEW_INSTALL" == "1" ]]; then
log ""
log "================================================================="

View file

@ -32,33 +32,55 @@ die() { log "FATAL: $*"; exit 1; }
command -v systemctl >/dev/null || die "systemd not found"
# --- 1. install metasploit-framework -----------------------------------
if ! command -v msfrpcd >/dev/null; then
log "msfrpcd not found; installing metasploit-framework"
# Auto-install paths per package manager. Rapid7's omnibus installer
# is the canonical zero-touch path for Debian/Ubuntu — it adds the
# apt repo, the GPG key, and apt-installs the framework. Other
# distros use their native package or fall back to the omnibus shell
# script.
if ! command -v msfrpcd >/dev/null && [[ ! -x /opt/metasploit-framework/bin/msfrpcd ]]; then
log "msfrpcd not found; installing metasploit-framework (~1 GiB)"
if command -v apt-get >/dev/null; then
# The Debian/Ubuntu metasploit-framework package isn't in
# the default repos for most distros. Use Rapid7's official
# nightly installer when available.
if [[ ! -x /opt/metasploit-framework/bin/msfrpcd ]]; then
log "fetching Rapid7 nightly installer"
curl -fsSL https://raw.githubusercontent.com/rapid7/metasploit-omnibus/master/config/templates/metasploit-framework-wrappers/msfupdate.erb \
-o /tmp/msfinstall.sh || true
log "automated install not available — install manually:"
log " https://docs.metasploit.com/docs/using-metasploit/getting-started/nightly-installers.html"
die "rerun once msfrpcd is on PATH"
fi
# Symlink the wrapper so ``msfrpcd`` is on PATH.
ln -sf /opt/metasploit-framework/bin/msfrpcd /usr/local/bin/msfrpcd
# Rapid7's omnibus installer wraps the apt-repo + GPG-key
# bootstrap + apt install in a single script. We fetch and
# exec it non-interactively. The script does:
# 1. add apt.metasploit.com to /etc/apt/sources.list.d/
# 2. install the GPG key
# 3. apt-get install -y metasploit-framework
log "running Rapid7 omnibus installer"
TMP="$(mktemp -d)"
curl -fsSL \
https://raw.githubusercontent.com/rapid7/metasploit-omnibus/master/config/templates/metasploit-framework-wrappers/msfupdate.erb \
-o "$TMP/msfinstall"
chmod +x "$TMP/msfinstall"
DEBIAN_FRONTEND=noninteractive "$TMP/msfinstall" </dev/null
rm -rf "$TMP"
elif command -v pacman >/dev/null; then
log "pacman -S metasploit"
pacman -Sy --noconfirm metasploit
elif command -v dnf >/dev/null; then
die "Fedora/RHEL: install metasploit-framework manually, then re-run"
# The omnibus installer also supports rpm distros via the
# same script — it auto-detects and uses dnf/yum.
log "running Rapid7 omnibus installer (dnf path)"
TMP="$(mktemp -d)"
curl -fsSL \
https://raw.githubusercontent.com/rapid7/metasploit-omnibus/master/config/templates/metasploit-framework-wrappers/msfupdate.erb \
-o "$TMP/msfinstall"
chmod +x "$TMP/msfinstall"
"$TMP/msfinstall" </dev/null
rm -rf "$TMP"
else
die "unknown package manager — install metasploit-framework manually"
die "unknown package manager — install metasploit-framework manually, then re-run"
fi
fi
command -v msfrpcd >/dev/null || die "msfrpcd still missing after install attempt"
# After install, msfrpcd may live at /opt/metasploit-framework/bin/
# (omnibus) or on PATH (apt repo). Symlink so callers find it.
if ! command -v msfrpcd >/dev/null; then
if [[ -x /opt/metasploit-framework/bin/msfrpcd ]]; then
ln -sf /opt/metasploit-framework/bin/msfrpcd /usr/local/bin/msfrpcd
fi
fi
command -v msfrpcd >/dev/null || die "msfrpcd still missing after install — see journalctl"
# --- 2. generate password ----------------------------------------------
install -d -m 0755 -o root -g root "$ETC_ROOT"

166
scripts/install-tier-3-4.sh Executable file
View file

@ -0,0 +1,166 @@
#!/usr/bin/env bash
# Tier-3 + Tier-4 deploy orchestrator. Idempotent. ZERO operator
# interaction — including no API key, no signup, no manual upload.
#
# Steps (each idempotent on its own):
# 1. install-msfrpcd.sh — auto-install metasploit-framework via
# Rapid7 omnibus + drop systemd unit
# 2. fetch-metasploitable2.sh — pull the disk image from the
# SourceForge public mirror (TOFU)
# 3. setup_bridge.sh — bring up br-malware host-only bridge
# for callback-payload modules
# 4. Tier-3 verify — fire vsftpd_234_backdoor against the
# freshly-fetched VM, confirm session
# lands and an episode is recorded
# 5. Tier-4 deploy — clone theZoo (public security-research
# repo, no auth), extract one real
# binary per manifest family, stage at
# samples/store/<sha256>, rewrite
# manifest.toml in place. MANDATORY:
# the deploy fails if zero samples land.
#
# Inputs (env, all optional):
# SKIP_VERIFY — set to skip the live Tier-3 fire test
# SKIP_BRIDGE — set to skip bridge setup (limits to non-callback modules)
# SKIP_TIER4 — set to skip Tier-4 deploy entirely (DEPRECATED;
# leaves you with mimic-only data, defeats the project)
#
# Run as root from anywhere on the lab host. Sub-scripts handle their
# own root checks.
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
INSTALL_ROOT="${INSTALL_ROOT:-/opt/cis490}"
DATA_ROOT="${DATA_ROOT:-/var/lib/cis490}"
ETC_ROOT="${ETC_ROOT:-/etc/cis490}"
log() { printf '[install-tier-3-4] %s\n' "$*" >&2; }
die() { log "FATAL: $*"; exit 1; }
[[ $EUID -eq 0 ]] || die "must run as root"
# Resolve script paths — prefer $INSTALL_ROOT (production) over
# $REPO_ROOT (dev clone) so a re-run under systemd uses the same
# scripts the orchestrator does.
script_path() {
local name="$1"
if [[ -x "$INSTALL_ROOT/scripts/$name" ]]; then echo "$INSTALL_ROOT/scripts/$name"; return
elif [[ -x "$REPO_ROOT/scripts/$name" ]]; then echo "$REPO_ROOT/scripts/$name"; return
elif [[ -x "$INSTALL_ROOT/vm/$name" ]]; then echo "$INSTALL_ROOT/vm/$name"; return
elif [[ -x "$REPO_ROOT/vm/$name" ]]; then echo "$REPO_ROOT/vm/$name"; return
else die "$name not found in $INSTALL_ROOT or $REPO_ROOT"
fi
}
# --- 1. msfrpcd --------------------------------------------------------
log "[1/5] install metasploit-framework + msfrpcd unit"
"$(script_path install-msfrpcd.sh)"
if ! systemctl is-active --quiet cis490-msfrpcd; then
log "starting cis490-msfrpcd"
systemctl enable --now cis490-msfrpcd
fi
sleep 3
if ! ss -ltn 2>/dev/null | grep -q ':55553'; then
log "cis490-msfrpcd not listening on 127.0.0.1:55553 yet — waiting up to 30s"
for _ in $(seq 1 30); do
ss -ltn 2>/dev/null | grep -q ':55553' && break
sleep 1
done
fi
ss -ltn 2>/dev/null | grep -q ':55553' || \
die "msfrpcd never bound to :55553 — check 'journalctl -u cis490-msfrpcd'"
log "msfrpcd ✓"
# --- 2. metasploitable2 image ------------------------------------------
log "[2/5] fetch Metasploitable2 disk image"
OUT_DIR="$DATA_ROOT/vm/images"
install -d -m 0755 -o cis490 -g cis490 "$OUT_DIR"
OUT_DIR="$OUT_DIR" "$(script_path fetch-metasploitable2.sh)"
chown cis490:cis490 "$OUT_DIR/metasploitable2.qcow2" 2>/dev/null || true
log "metasploitable2.qcow2 ✓"
# --- 3. bridge ---------------------------------------------------------
if [[ -z "${SKIP_BRIDGE:-}" ]]; then
log "[3/5] bring up br-malware host-only bridge"
"$(script_path setup_bridge.sh)" || log "bridge setup failed (non-fatal); only non-callback modules will fire"
log "br-malware ✓"
else
log "[3/5] SKIP_BRIDGE set — limiting to non-callback modules"
fi
# --- 4. Tier-3 verify --------------------------------------------------
if [[ -z "${SKIP_VERIFY:-}" ]]; then
log "[4/5] verify Tier-3 fire (vsftpd_234_backdoor)"
set -a
# shellcheck disable=SC1091
. "$ETC_ROOT/msfrpc.env"
set +a
PY="$INSTALL_ROOT/.venv/bin/python"
[[ -x "$PY" ]] || PY="$(command -v python3)"
if ! sudo -E -u cis490 "$PY" "$INSTALL_ROOT/tools/run_tier3_demo.py" \
--module vsftpd_234_backdoor \
--target-port 21 \
--target-boot-timeout 240 \
> /tmp/cis490-tier3-verify.log 2>&1; then
log "verify run failed — log at /tmp/cis490-tier3-verify.log; dumping last 30 lines:"
tail -30 /tmp/cis490-tier3-verify.log >&2 || true
die "Tier-3 fire failed"
fi
if grep -q '^episode_id = ' /tmp/cis490-tier3-verify.log; then
log "Tier-3 verified ✓ ($(grep '^episode_id = ' /tmp/cis490-tier3-verify.log))"
else
log "verify run finished but no episode_id seen — log at /tmp/cis490-tier3-verify.log"
fi
else
log "[4/5] SKIP_VERIFY set"
fi
# --- 5. Tier-4 deploy (MANDATORY, no auth required) --------------------
if [[ -n "${SKIP_TIER4:-}" ]]; then
log "[5/5] SKIP_TIER4 set — leaving this host on Tier 2/3 mimic-only."
log " This is NOT the recommended configuration; the project's"
log " training target is real-binary episodes."
else
log "[5/5] Tier-4 deploy (real malware fetch from theZoo — mandatory)"
command -v git >/dev/null || die "git not installed; need it to clone theZoo"
PY="$INSTALL_ROOT/.venv/bin/python"
[[ -x "$PY" ]] || PY="$(command -v python3)"
# theZoo clone lives on shared persistent storage so re-runs don't
# re-download. cis490 user owns it for periodic git pull.
THEZOO_DIR="${THEZOO_DIR:-/var/lib/cis490/theZoo}"
install -d -o cis490 -g cis490 -m 0755 "$(dirname "$THEZOO_DIR")"
if ! sudo -E -u cis490 "$PY" \
"$INSTALL_ROOT/tools/auto_fetch_samples.py" \
--thezoo-clone-dir "$THEZOO_DIR" \
> /tmp/cis490-tier4-deploy.log 2>&1; then
log "Tier-4 fetch failed — last 30 lines of /tmp/cis490-tier4-deploy.log:"
tail -30 /tmp/cis490-tier4-deploy.log >&2 || true
die "Tier-4 deploy failed; without real binaries this host produces only mimics"
fi
REAL_COUNT="$(ls "$INSTALL_ROOT/samples/store/" 2>/dev/null | wc -l)"
if [[ "$REAL_COUNT" -lt 1 ]]; then
log "auto_fetch_samples.py exited 0 but samples/store/ is empty — see /tmp/cis490-tier4-deploy.log"
tail -30 /tmp/cis490-tier4-deploy.log >&2 || true
die "Tier-4 deploy failed: no real binaries staged"
fi
log "Tier-4 ✓ ($REAL_COUNT real binaries staged in $INSTALL_ROOT/samples/store/)"
fi
log ""
log "================================================================="
log " Tier-3 deploy complete on $(hostname)"
log "================================================================="
log " - metasploit-framework + cis490-msfrpcd.service active"
log " - $OUT_DIR/metasploitable2.qcow2 staged"
log " - bridge: $(ip link show br-malware >/dev/null 2>&1 && echo up || echo skipped)"
log " - Tier-4: $(ls "$INSTALL_ROOT/samples/store/" 2>/dev/null | wc -l) real binaries staged"
log ""
log " Restart the orchestrator so the next wave runs Tier-3:"
log " sudo systemctl restart cis490-orchestrator"
log "================================================================="

View file

@ -31,6 +31,7 @@ on a matching sha256, 409 on a divergent one.
from __future__ import annotations
import json
import logging
import shutil
import subprocess
@ -84,11 +85,25 @@ class ShipperQueue:
transient += 1
continue
res = self.transport.ship_tarball(episode_id, tarball, sha)
log.info(
"ship %s -> %s (%d) %s",
episode_id, res.status, res.status_code, res.error or "",
)
commit = self._read_episode_commit(ep_dir)
res = self.transport.ship_tarball(episode_id, tarball, sha,
commit=commit)
# Receiver returns 412 with a remediation block when the
# commit isn't in its allow-list — surface the body so the
# lab-host operator (or its on-device AI) sees what to run.
if res.status_code == 412 and res.body:
log.error(
"ship %s -> 412 commit-rejected. your_commit=%s "
"head=%s. Remediation:\n%s",
episode_id, (res.body or {}).get("your_commit"),
(res.body or {}).get("head_commit"),
(res.body or {}).get("remediation", "<none>"),
)
else:
log.info(
"ship %s -> %s (%d) %s",
episode_id, res.status, res.status_code, res.error or "",
)
if res.status in ("stored", "already-present"):
self._retire(ep_dir, tarball)
@ -135,6 +150,24 @@ class ShipperQueue:
out.append(ep)
return out
def _read_episode_commit(self, ep_dir: Path) -> str | None:
"""Pull meta.json::code_version.commit so the shipper can send
it as X-Cis490-Code-Commit. Returns None if the file is
missing/malformed; the receiver will reject with a 400 in that
case and the operator can see the lab host needs a re-install."""
meta_path = ep_dir / "meta.json"
if not meta_path.exists():
return None
try:
meta = json.loads(meta_path.read_text())
except (json.JSONDecodeError, OSError):
return None
cv = meta.get("code_version") or {}
commit = cv.get("commit")
if isinstance(commit, str) and len(commit) == 40:
return commit.lower()
return None
def _tar_episode(self, ep_dir: Path) -> tuple[Path, str]:
"""Tar+zstd the episode dir into outbox. Idempotent — overwrites
any prior partial. Returns ``(tarball_path, sha256_hex)``."""

View file

@ -162,6 +162,7 @@ class ShipperTransport:
episode_id: str,
tarball_path: Path,
sha256_hex: str,
commit: str | None = None,
) -> ShipResult:
if not self._try_build_verify():
return ShipResult(
@ -180,6 +181,10 @@ class ShipperTransport:
"X-Content-SHA256": sha256_hex,
"X-Episode-Id": episode_id,
}
if commit:
# Receiver enforces this against its commit-allow-list and
# rejects with 412 if not in window. See receiver/version_gate.py.
headers["X-Cis490-Code-Commit"] = commit
try:
with httpx.Client(verify=self._verify, timeout=self.cfg.request_timeout_s) as c, \
@ -225,6 +230,18 @@ class ShipperTransport:
body=body_json,
error="receiver already has a different sha256 for this id",
)
if r.status_code == 412:
# Code-commit not in receiver's allow-list. The operator
# of THIS lab host needs to pull main + reinstall;
# retrying without that won't help. Treat as fatal so
# queue.run_once() doesn't loop on it.
return ShipResult(
status="fatal",
status_code=412,
sha256=None,
body=body_json,
error="code commit rejected — pull origin/main and reinstall",
)
if 500 <= r.status_code < 600:
return ShipResult(
status="transient",

View file

@ -0,0 +1,200 @@
"""Tests for tools/auto_fetch_samples.py.
Exercises the parts that can be tested without a real theZoo clone:
- ELF magic-byte sniffing for Linux i386 detection
- family-name directory matching (substring + token fallback)
- manifest in-place rewrite (atomic, stat-preserving)
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"auto_fetch_samples", REPO_ROOT / "tools" / "auto_fetch_samples.py"
)
afs = importlib.util.module_from_spec(spec)
sys.modules["auto_fetch_samples"] = afs
spec.loader.exec_module(afs)
# ---------------------------------------------------------------------------
# ELF magic detection
# ---------------------------------------------------------------------------
def _write(p: Path, data: bytes) -> Path:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(data)
return p
def _elf_header(*, ei_class: int = 1, ei_data: int = 1, ei_osabi: int = 0,
e_machine: int = 0x03) -> bytes:
"""Synthesise a minimal ELF header. Default = Linux i386."""
h = bytearray(20)
h[:4] = b"\x7fELF"
h[4] = ei_class # 1=32, 2=64
h[5] = ei_data # 1=little, 2=big
h[6] = 1 # ei_version
h[7] = ei_osabi # 0=SYSV, 3=Linux, 9=FreeBSD
h[18:20] = e_machine.to_bytes(2, "little")
return bytes(h)
def test_is_linux_i386_elf_accepts_sysv(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header())
assert afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_accepts_linux_osabi(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header(ei_osabi=3))
assert afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_freebsd(tmp_path: Path) -> None:
"""Snoopy.A in theZoo is FreeBSD/i386 — looks similar but won't
run on Metasploitable2."""
p = _write(tmp_path / "x", _elf_header(ei_osabi=9))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_x86_64(tmp_path: Path) -> None:
p = _write(tmp_path / "x", _elf_header(ei_class=2, e_machine=0x3E))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_arm(tmp_path: Path) -> None:
"""Mirai.B in theZoo is ARM — won't run on x86 Metasploitable2."""
p = _write(tmp_path / "x", _elf_header(e_machine=0x28))
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_pe32(tmp_path: Path) -> None:
"""Windows PE32 starts with MZ, not \\x7fELF."""
p = _write(tmp_path / "x", b"MZ" + b"\x00" * 18)
assert not afs._is_linux_i386_elf(p)
def test_is_linux_i386_elf_rejects_text(tmp_path: Path) -> None:
p = _write(tmp_path / "x", b"hello\n")
assert not afs._is_linux_i386_elf(p)
# ---------------------------------------------------------------------------
# Family-token expansion (the bug that broke v1: "Cryptolocker-class"
# wouldn't match "CryptoLocker_22Jan2014" because the suffix "-class"
# isn't in the dir name)
# ---------------------------------------------------------------------------
def test_family_tokens_strips_suffix() -> None:
assert afs._family_tokens("Cryptolocker-class") == [
"cryptolocker-class", "cryptolocker"
]
def test_family_tokens_dot_namespaces_kept() -> None:
"""Linux.Mirai stays as `linux.mirai` so it lands on the right dir
rather than matching every Linux.* entry by the head token."""
out = afs._family_tokens("Linux.Mirai")
assert out[0] == "linux.mirai"
# Head token "linux" is appended as a fallback.
assert "linux" in out
# ---------------------------------------------------------------------------
# Extraction picker prefers Linux i386 ELF
# ---------------------------------------------------------------------------
def test_extract_largest_binary_prefers_linux_i386(tmp_path: Path) -> None:
"""Mimics theZoo's Linux.Encoder.1 layout: multiple binaries in the
same zip, only one of which is Linux i386. The picker must return
that one even though it isn't the largest."""
import zipfile
zip_path = tmp_path / "test.zip"
big_x86_64 = _elf_header(ei_class=2, e_machine=0x3E) + b"\x00" * 5000
small_i386 = _elf_header() + b"\x00" * 100
freebsd_i386 = _elf_header(ei_osabi=9) + b"\x00" * 8000
with zipfile.ZipFile(zip_path, "w") as z:
z.writestr("big-x86-64", big_x86_64)
z.writestr("small-i386", small_i386)
z.writestr("freebsd-i386", freebsd_i386)
work = tmp_path / "extract"
chosen = afs._extract_largest_binary(zip_path, work)
assert chosen is not None
assert chosen.name == "small-i386", (
f"picker should prefer Linux i386 over larger non-Linux ELFs, "
f"got {chosen.name}"
)
def test_extract_largest_binary_falls_back_to_other_elf(tmp_path: Path) -> None:
"""Mimics theZoo's Linux.Mirai.B (ARM ELF only). Picker should
still return something even though it won't run on Metasploitable2."""
import zipfile
zip_path = tmp_path / "test.zip"
arm_elf = _elf_header(e_machine=0x28) + b"\x00" * 200
text = b"placeholder text\n"
with zipfile.ZipFile(zip_path, "w") as z:
z.writestr("arm-binary", arm_elf)
z.writestr("readme.txt", text)
work = tmp_path / "extract"
chosen = afs._extract_largest_binary(zip_path, work)
assert chosen is not None
assert chosen.name == "arm-binary"
# ---------------------------------------------------------------------------
# Manifest rewrite preserves stat
# ---------------------------------------------------------------------------
def test_update_manifest_entry_preserves_mode(tmp_path: Path) -> None:
import stat as _st
m = tmp_path / "manifest.toml"
m.write_text(
'[[sample]]\n'
'name = "x"\n'
'family = "F"\n'
'category = "rat"\n'
'profile = "shell-resident"\n'
'description = "d"\n'
)
m.chmod(0o644)
before = _st.S_IMODE(m.stat().st_mode)
afs.update_manifest_entry(m, "x", source="theZoo",
sha256="a" * 64,
url="https://example.invalid/")
after = _st.S_IMODE(m.stat().st_mode)
assert before == after
text = m.read_text()
assert 'sha256 = "' + ("a" * 64) + '"' in text
assert 'source = "theZoo"' in text
def test_update_manifest_entry_skips_when_sha256_already_set(tmp_path: Path) -> None:
"""Re-running auto_fetch on an already-staged sample is a no-op."""
m = tmp_path / "manifest.toml"
m.write_text(
'[[sample]]\n'
'name = "x"\n'
'family = "F"\n'
'category = "rat"\n'
'profile = "shell-resident"\n'
'sha256 = "' + ("a" * 64) + '"\n'
'description = "d"\n'
)
before = m.read_text()
afs.update_manifest_entry(m, "x", source="theZoo",
sha256="b" * 64,
url="https://example.invalid/")
after = m.read_text()
assert before == after, "should not overwrite an existing sha256"

View file

@ -34,6 +34,15 @@ def test_episode_against_self_pid_produces_full_directory(tmp_path: Path) -> Non
meta = json.loads((d / "meta.json").read_text())
assert meta["episode_id"] == result.episode_id
assert meta["schema_version"] == 1
# code_version stamps which commit produced the episode so trainers
# can filter out pre-fix data without scanning every tarball.
assert "code_version" in meta
cv = meta["code_version"]
assert "commit" in cv and "source" in cv
# Source is "git" (we run tests in a git checkout) or "VERSION-file"
# (someone running tests against /opt/cis490/) or "unknown" (CI
# without git). All three are acceptable; the field is what matters.
assert cv["source"] in {"git", "VERSION-file", "unknown"}
assert meta["started_at_wall"] is not None
assert meta["ended_at_wall"] is not None
assert meta["vm"]["target_pid"] == os.getpid()

View file

@ -69,7 +69,9 @@ def _events(rows: list[dict]) -> bytes:
def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
"""Synthesize /proc rows with either flat-CPU (no phase signal)
or sharply-spiking CPU (clear phase boundaries). The test labels
file pairs with these."""
file pairs with these. Both t_mono_ns and t_wall_ns are emitted
the classifier uses t_wall_ns for phase mapping (consistent across
sources whose t_mono_ns time-bases differ)."""
out: list[dict] = []
for i in range(n):
t = i * 100_000_000
@ -83,6 +85,7 @@ def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
)
out.append({
"t_mono_ns": t,
"t_wall_ns": t, # synthetic: identity to t_mono_ns for tests
"cpu_user_jiffies": jiff,
"cpu_sys_jiffies": 0,
"rss_bytes": 1024 * 1024,
@ -92,7 +95,8 @@ def _proc_rows(*, flat: bool, n: int = 80) -> bytes:
def _labels(boundary_ns: list[int], names: list[str]) -> bytes:
rows = [
{"t_mono_ns": t, "phase": p, "prev": names[i - 1] if i else None}
{"t_mono_ns": t, "t_wall_ns": t, "phase": p,
"prev": names[i - 1] if i else None}
for i, (t, p) in enumerate(zip(boundary_ns, names))
]
return ("\n".join(json.dumps(r) for r in rows) + "\n").encode()
@ -202,6 +206,58 @@ def test_workload_silent_flag(tmp_path: Path) -> None:
assert "workload-silent" in q.reasons
def test_flat_proc_rescued_by_netflow(tmp_path: Path) -> None:
"""A scan-and-dial / bursty-c2 episode leaves /proc nearly idle but
netflow shows clear inter-phase traffic deltas. Multi-signal
classifier must not flag this episode as flat."""
n = 60
netflow_rows = []
# phase boundaries match _make_episode default
for i in range(n * 5): # 100ms buckets
t = i * 20_000_000 # 20 ms per bucket
# heavy traffic only during infected_running (middle third)
in_burst = (n // 3 * 100_000_000) <= t < (2 * n // 3 * 100_000_000)
netflow_rows.append({
"t_mono_ns": t,
"t_wall_ns": t,
"bytes_in": 80_000 if in_burst else 0,
"bytes_out": 60_000 if in_burst else 0,
})
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" not in q.reasons, (
f"netflow burst should rescue this episode; got reasons={q.reasons}"
)
def test_flat_everywhere_still_flags(tmp_path: Path) -> None:
"""If /proc AND netflow AND qmp all show no inter-phase variation,
the episode is genuinely silent and must still flag."""
n = 60
netflow_rows = [
{"t_mono_ns": i * 20_000_000, "t_wall_ns": i * 20_000_000,
"bytes_in": 100, "bytes_out": 50}
for i in range(n * 5)
]
netflow_jsonl = ("\n".join(json.dumps(r) for r in netflow_rows) + "\n").encode()
tar = _make_episode(
tmp_path,
**{
"01TEST/telemetry-proc.jsonl": _proc_rows(flat=True, n=n),
"01TEST/netflow.jsonl": netflow_jsonl,
},
)
q = pe.classify_episode(tar, host_id="lab1", episode_id="01TEST")
assert "flat-cpu" in q.reasons
def test_workload_silent_suppressed_when_host_cpu_real(tmp_path: Path) -> None:
"""CIS490#15 regression: busybox pgrep -c is unsupported, so the
in-guest probe always reports yes=0 on Alpine guests even when the

View file

@ -0,0 +1,41 @@
"""Pytest wrapper around tools/verify_tier3_local.py.
The verifier is its own CLI for operators. The test wrapper just runs
the steps that are deterministic without external services so a
regression (e.g. someone breaking chunked_real_binary_upload) gets
caught by `pytest` without needing msfrpcd or a populated store.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
spec = importlib.util.spec_from_file_location(
"verify_tier3_local", REPO_ROOT / "tools" / "verify_tier3_local.py"
)
v = importlib.util.module_from_spec(spec)
sys.modules["verify_tier3_local"] = v
spec.loader.exec_module(v)
def test_module_configs_parse() -> None:
s = v.step_module_configs_parse()
assert s.outcome == "PASS", f"{s.name}: {s.detail}"
def test_manifest_distribution() -> None:
s = v.step_manifest_distribution()
assert s.outcome == "PASS", f"{s.name}: {s.detail}"
def test_chunked_upload_through_real_sh(tmp_path) -> None:
"""The big one: chunked_real_binary_upload must survive a real
/bin/sh round-trip. This is what proves the Tier-4 binary upload
path will work against an msfrpc shell session same wire shape,
same shell semantics."""
s = v.step_chunked_upload_via_local_sh(tmp_path)
assert s.outcome == "PASS", f"{s.name}: {s.detail}"

263
tests/test_version_gate.py Normal file
View file

@ -0,0 +1,263 @@
"""Tests for the receiver's commit-allow-list gate.
The gate refreshes the allow-list from `git log` of a configured
repo path. Tests use real git operations on a temp repo so we
exercise the same subprocess code paths the receiver does in
production.
"""
from __future__ import annotations
import subprocess
from pathlib import Path
import pytest
from receiver.version_gate import VersionGate
def _git(cwd: Path, *args: str) -> str:
return subprocess.check_output(
["git", "-c", "user.email=t@t", "-c", "user.name=t",
"-C", str(cwd), *args],
text=True,
).strip()
@pytest.fixture
def repo(tmp_path: Path) -> Path:
r = tmp_path / "repo"
r.mkdir()
_git(r, "init", "--initial-branch=main")
(r / "f").write_text("v1")
_git(r, "add", "f")
_git(r, "commit", "-m", "v1")
return r
def _commits(repo: Path) -> list[str]:
return _git(repo, "log", "--format=%H").splitlines()
def test_forgejo_backend_accepts_returned_commits(tmp_path: Path) -> None:
"""Forgejo-backed gate hits a canned HTTP server returning a
commits list; the parser pulls sha + first-row-is-head."""
import json as _json, threading as _t
from http.server import BaseHTTPRequestHandler, HTTPServer
HEAD = "abcdef0123456789" * 2 + "0" * 8 # 40 hex
OLD = "1234" * 10
canned = _json.dumps([{"sha": HEAD}, {"sha": OLD}]).encode()
class H(BaseHTTPRequestHandler):
def log_message(self, *a): pass
def do_GET(self):
assert "/api/v1/repos/spectral/CIS490/commits" in self.path
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(canned)))
self.end_headers()
self.wfile.write(canned)
srv = HTTPServer(("127.0.0.1", 0), H)
port = srv.server_address[1]
th = _t.Thread(target=srv.serve_forever, daemon=True)
th.start()
try:
g = VersionGate(
forgejo_url=f"http://127.0.0.1:{port}",
repo_owner="spectral", repo_name="CIS490", branch="main",
window=50, cache_ttl_s=0,
)
ok, _ = g.check(HEAD)
assert ok
assert g.head() == HEAD
ok, _ = g.check(OLD)
assert ok
ok, reason = g.check("0" * 40)
assert not ok and reason == "not-in-window"
finally:
srv.shutdown()
th.join(timeout=1)
def test_check_accepts_head_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
head = _commits(repo)[0]
ok, reason = g.check(head)
assert ok and reason is None
assert g.head() == head
def test_check_rejects_unknown_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check("0" * 40)
assert not ok and reason == "not-in-window"
def test_check_rejects_missing_commit(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check(None)
assert not ok and reason == "missing"
ok, reason = g.check("")
assert not ok and reason == "missing"
def test_check_rejects_bad_format(repo: Path) -> None:
g = VersionGate(repo, window=10, cache_ttl_s=0)
ok, reason = g.check("not-a-hash")
assert not ok and reason == "bad-format"
ok, reason = g.check("ABCDEF") # too short, but valid hex
assert not ok and reason == "bad-format"
def test_new_commit_after_pull_is_accepted_within_ttl(repo: Path) -> None:
"""The whole point: when the maintainer commits new code on the
Pi, the receiver picks it up automatically without restart."""
g = VersionGate(repo, window=10, cache_ttl_s=0)
# Add a new commit AFTER gate is constructed.
(repo / "f").write_text("v2")
_git(repo, "commit", "-am", "v2")
new_head = _commits(repo)[0]
# cache_ttl_s=0 forces refresh on next check.
ok, _ = g.check(new_head)
assert ok
assert g.head() == new_head
def test_window_limits_history(repo: Path) -> None:
"""Old commits past the window should drop out of the allow-list."""
# Add 5 more commits.
for i in range(2, 7):
(repo / "f").write_text(f"v{i}")
_git(repo, "commit", "-am", f"v{i}")
all_commits = _commits(repo)
assert len(all_commits) == 6
g = VersionGate(repo, window=3, cache_ttl_s=0)
# Top 3 are valid.
for c in all_commits[:3]:
ok, _ = g.check(c)
assert ok, f"{c[:8]} should be in window"
# Older 3 are not.
for c in all_commits[3:]:
ok, reason = g.check(c)
assert not ok and reason == "not-in-window"
def test_e2e_receiver_returns_412_for_unknown_commit(repo: Path, tmp_path: Path) -> None:
"""End-to-end: PUT with an out-of-window commit returns 412 with
the remediation block, and the tarball does NOT land on disk."""
import io as _io, json as _json, tarfile as _tar, hashlib as _h
from starlette.testclient import TestClient
from receiver.app import make_app
from receiver.store import EpisodeStore
head = _commits(repo)[0]
rcv_root = tmp_path / "rcv"
store = EpisodeStore(
store_root=rcv_root / "ep",
incoming_root=rcv_root / "in",
index_path=rcv_root / "index.jsonl",
)
gate = VersionGate(repo, window=10, cache_ttl_s=0)
app = make_app(store=store, max_episode_bytes=10_000_000,
bearer_token=None, version_gate=gate)
# Build a tiny valid tarball.
raw = _io.BytesIO()
with _tar.open(fileobj=raw, mode="w") as t:
info = _tar.TarInfo("01TEST/meta.json")
body = b"{}"
info.size = len(body)
t.addfile(info, _io.BytesIO(body))
payload = raw.getvalue()
sha = _h.sha256(payload).hexdigest()
with TestClient(app) as client:
# Wrong commit: rejected with 412 + remediation in body.
bad = "0" * 40
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
"X-Cis490-Code-Commit": bad,
},
)
assert r.status_code == 412
body = r.json()
assert "remediation" in body
assert body["your_commit"] == bad
assert body["head_commit"] == head
# Index must NOT have grown.
assert store.index_path.read_text() == ""
# Right commit: accepted (201).
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
"X-Cis490-Code-Commit": head,
},
)
assert r.status_code == 201, r.text
# Index gained one row stamped with the commit.
rows = [_json.loads(l) for l in store.index_path.read_text().splitlines() if l.strip()]
assert len(rows) == 1
assert rows[0]["commit"] == head
def test_e2e_receiver_returns_400_when_commit_header_missing(repo: Path, tmp_path: Path) -> None:
"""Missing header is a client bug (lab host pre-stamp-update);
receiver returns 400 with remediation."""
import io as _io, tarfile as _tar, hashlib as _h
from starlette.testclient import TestClient
from receiver.app import make_app
from receiver.store import EpisodeStore
rcv_root = tmp_path / "rcv"
store = EpisodeStore(
store_root=rcv_root / "ep",
incoming_root=rcv_root / "in",
index_path=rcv_root / "index.jsonl",
)
gate = VersionGate(repo, window=10, cache_ttl_s=0)
app = make_app(store=store, max_episode_bytes=10_000_000,
bearer_token=None, version_gate=gate)
raw = _io.BytesIO()
with _tar.open(fileobj=raw, mode="w") as t:
info = _tar.TarInfo("01TEST/meta.json")
info.size = 2
t.addfile(info, _io.BytesIO(b"{}"))
payload = raw.getvalue()
sha = _h.sha256(payload).hexdigest()
with TestClient(app) as client:
r = client.put(
f"/v1/episodes/lab1/01TEST.tar.zst",
content=payload,
headers={
"X-Content-SHA256": sha,
"X-Lab-Host": "lab1",
# no X-Cis490-Code-Commit
},
)
assert r.status_code == 400
assert "missing" in r.json()["error"].lower()
def test_missing_repo_keeps_prior_cache(repo: Path) -> None:
"""If the maintainer's clone disappears (or git fails), the gate
keeps its last-known allow-list better than locking out every
shipper at once."""
g = VersionGate(repo, window=10, cache_ttl_s=0)
head = _commits(repo)[0]
ok, _ = g.check(head)
assert ok
# Now break the repo path.
g.repo_path = repo / "does-not-exist"
# Cache should still serve the previously-known head.
ok, _ = g.check(head)
assert ok

354
tools/auto_fetch_samples.py Normal file
View file

@ -0,0 +1,354 @@
"""``cis490-auto-fetch-samples`` — pull one real binary per manifest
family from theZoo and update ``samples/manifest.toml``.
No API key, no signup, no operator interaction. theZoo is a public
security-research repository (https://github.com/ytisf/theZoo)
maintained for malware analysis. Each sample is a password-protected
zip; the password is the well-known ``infected``. We clone the repo
once (~500 MB shallow), then for each manifest entry without a
sha256 we:
1. Locate a directory in ``theZoo/malware/Binaries/`` matching
the entry's ``family`` (case-insensitive substring)
2. Find the .zip in that directory
3. Extract with password ``infected``
4. Pick the largest non-text payload as the binary
5. Compute its sha256, copy to ``samples/store/<sha256>``
6. Rewrite ``manifest.toml`` in place adding source/sha256/url
Idempotent: entries with sha256 already set are skipped. Manifest
edits are atomic (tempfile + os.replace, stat preserved). Families
that don't match anything in theZoo fail loudly so the deploy
script can decide whether to abort.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from samples.manifest import SampleManifest # noqa: E402
log = logging.getLogger("cis490.auto_fetch_samples")
THEZOO_URL = "https://github.com/ytisf/theZoo.git"
THEZOO_PASSWORD = b"infected"
def _ensure_thezoo(clone_dir: Path) -> Path:
"""Clone theZoo if missing; pull if present. Returns the clone path."""
if (clone_dir / ".git").exists():
log.info("theZoo already cloned at %s; pulling latest", clone_dir)
try:
subprocess.run(
["git", "-C", str(clone_dir), "pull", "--ff-only"],
check=True, capture_output=True, text=True, timeout=120,
)
except subprocess.CalledProcessError as e:
log.warning("git pull failed (using existing clone): %s",
e.stderr[:200])
return clone_dir
log.info("cloning %s%s (~500 MB shallow)", THEZOO_URL, clone_dir)
clone_dir.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", THEZOO_URL, str(clone_dir)],
check=True, timeout=600,
)
return clone_dir
def _family_tokens(family: str) -> list[str]:
"""Split a manifest family name into search tokens. ``Cryptolocker-class``
``["cryptolocker-class", "cryptolocker"]`` so the search hits theZoo
dirs like ``CryptoLocker_22Jan2014`` (which contain "cryptolocker"
but not "-class"). ``Linux.Mirai.B`` ``["linux.mirai.b", "linux"]``
the literal-first-token will match the exact dir, the second is a
fallback. Tokens are tried in order; the first matching dir wins."""
f = family.lower().strip()
out: list[str] = [f]
# Strip any "-suffix" / "_suffix" / ".suffix" the manifest uses for
# clarity (e.g. "Cryptolocker-class" → also try "cryptolocker";
# "Linux.Mirai" → also try "linux.mirai" then fall back to
# "linux"). All tokens are tried in order; first match wins.
head_dash = f.split("-")[0]
if head_dash != f:
out.append(head_dash)
head_dot = f.split(".")[0]
if head_dot != f and head_dot not in out:
out.append(head_dot)
head_underscore = f.split("_")[0]
if head_underscore != f and head_underscore not in out:
out.append(head_underscore)
return out
def _find_family_dir(thezoo: Path, family: str) -> Path | None:
"""Locate a Binaries subdir matching ``family`` (case-insensitive
substring). theZoo's layout is ``malware/Binaries/<Family-Name>/``.
Two-pass match: first try the full lower-cased family, then strip
suffixes like ``-class``/``-mimic`` and try the head token. We pick
the prefix-match if there is one (so ``Mirai`` lands on
``Linux.Mirai.B`` rather than ``MirageFox``), otherwise the first
substring match in alphabetical order."""
binaries = thezoo / "malware" / "Binaries"
if not binaries.is_dir():
log.warning("theZoo layout missing %s — pull broke?", binaries)
return None
children = [c for c in sorted(binaries.iterdir()) if c.is_dir()]
for needle in _family_tokens(family):
matches = [c for c in children if needle in c.name.lower()]
if not matches:
continue
# Prefer prefix match.
for m in matches:
if m.name.lower().startswith(needle):
return m
return matches[0]
return None
def _is_linux_i386_elf(path: Path) -> bool:
"""Check magic bytes for ELF 32-bit Intel 80386 (Metasploitable2's
native arch). Pure stdlib so we don't depend on `file`."""
try:
with path.open("rb") as f:
head = f.read(20)
except OSError:
return False
if len(head) < 20 or head[:4] != b"\x7fELF":
return False
# ei_class = 1 (32-bit), ei_data = 1 (little-endian), e_machine
# at offset 18 = 0x03 for i386. ei_osabi at offset 7 == 0 (SYSV)
# OR 3 (Linux). FreeBSD is 9 — exclude.
if head[4] != 1 or head[5] != 1:
return False
if head[7] not in (0, 3): # SYSV or Linux
return False
e_machine = int.from_bytes(head[18:20], "little")
return e_machine == 0x03 # EM_386
def _extract_largest_binary(zip_path: Path, work_dir: Path) -> Path | None:
"""Extract the password-protected zip and return the best payload:
1. The largest **Linux i386 ELF** in the archive (prefers binaries
that will actually execute inside Metasploitable2).
2. Any other ELF (some samples are ARM/x86-64; the chunked
uploader will land them but execution is best-effort).
3. The largest non-text file (last-resort fallback for
Windows-PE-only archives, in case Wine is on the target).
Filters out obvious sidecars (md5/sha256/passwords/readmes)."""
work_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path) as z:
try:
z.extractall(path=work_dir, pwd=THEZOO_PASSWORD)
except RuntimeError as e:
log.warning("extract %s failed: %s", zip_path.name, e)
return None
payloads: list[Path] = []
for f in work_dir.rglob("*"):
if not f.is_file():
continue
name = f.name.lower()
if any(name.endswith(suf) for suf in (".md5", ".sha256", ".sha1",
".txt", ".md", ".pass", ".c",
".bat", ".sln", ".vcproj")):
continue
if name in {"readme", "license", "metadata.txt"}:
continue
payloads.append(f)
if not payloads:
return None
# Tier 1: Linux i386 ELF, largest first.
linux_i386 = sorted(
(p for p in payloads if _is_linux_i386_elf(p)),
key=lambda p: p.stat().st_size, reverse=True,
)
if linux_i386:
return linux_i386[0]
# Tier 2: any ELF (best-effort — chunked upload still works,
# the binary may fail to execute inside the target VM but the
# episode records the attempt).
def _is_elf(p: Path) -> bool:
try:
with p.open("rb") as f:
return f.read(4) == b"\x7fELF"
except OSError:
return False
other_elf = sorted(
(p for p in payloads if _is_elf(p)),
key=lambda p: p.stat().st_size, reverse=True,
)
if other_elf:
log.warning("%s: no Linux i386 ELF found; falling back to %s "
"(may not execute on Metasploitable2)",
zip_path.name, other_elf[0].name)
return other_elf[0]
# Tier 3: largest non-text payload (Windows PE etc.).
log.warning("%s: no ELF found; falling back to largest non-text payload",
zip_path.name)
return max(payloads, key=lambda p: p.stat().st_size)
def _sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def update_manifest_entry(manifest_path: Path, name: str,
source: str, sha256: str, url: str) -> None:
"""Add `source`, `sha256`, `url` lines to the manifest entry whose
`name` matches. Atomic + stat-preserving."""
text = manifest_path.read_text()
needle = f'name = "{name}"'
idx = text.find(needle)
if idx < 0:
raise ValueError(f"name = {name!r} not found in {manifest_path}")
next_block = text.find("[[", idx + len(needle))
end = next_block if next_block != -1 else len(text)
block = text[idx:end]
if "sha256 =" in block:
log.info("entry %s already has sha256; skipping in-place edit", name)
return
insert = (
f'source = "{source}"\n'
f'sha256 = "{sha256}"\n'
f'url = "{url}"\n'
)
desc_idx = block.find("description = ")
if desc_idx >= 0:
new_block = block[:desc_idx] + insert + block[desc_idx:]
else:
new_block = block.rstrip() + "\n" + insert + "\n"
new_text = text[:idx] + new_block + text[end:]
st = manifest_path.stat()
tmp = manifest_path.with_suffix(".toml.partial")
tmp.write_text(new_text)
os.replace(tmp, manifest_path)
try:
os.chown(manifest_path, st.st_uid, st.st_gid)
except (PermissionError, OSError):
pass
os.chmod(manifest_path, st.st_mode & 0o7777)
def fetch_one(thezoo: Path, sample_family: str, sample_name: str,
store_root: Path, work_root: Path) -> tuple[str, Path] | None:
"""Locate, extract, and stage one binary for a manifest family.
Returns (sha256, store_path) or None if the family wasn't found."""
fam_dir = _find_family_dir(thezoo, sample_family)
if fam_dir is None:
log.warning("%s: no theZoo dir matching family=%r", sample_name, sample_family)
return None
zips = sorted(fam_dir.rglob("*.zip"))
if not zips:
log.warning("%s: %s has no .zip — theZoo layout drift?",
sample_name, fam_dir)
return None
work_dir = work_root / sample_name
if work_dir.exists():
shutil.rmtree(work_dir)
binary = _extract_largest_binary(zips[0], work_dir)
if binary is None:
log.warning("%s: %s extraction yielded no payload",
sample_name, zips[0])
return None
sha = _sha256_of(binary)
store_root.mkdir(parents=True, exist_ok=True)
target = store_root / sha
if not target.exists():
shutil.copy2(binary, target)
log.info("%s: staged %s (%d bytes, sha256=%s)",
sample_name, target.name, target.stat().st_size, sha[:12])
# Best-effort: clean the per-sample work dir so disk doesn't grow.
shutil.rmtree(work_dir, ignore_errors=True)
return sha, target
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="cis490-auto-fetch-samples")
p.add_argument("--manifest",
default=str(REPO_ROOT / "samples" / "manifest.toml"))
p.add_argument("--store-root",
default=str(REPO_ROOT / "samples" / "store"))
p.add_argument("--thezoo-clone-dir",
default="/var/lib/cis490/theZoo",
help="Where to (re)clone theZoo. Cached across runs.")
p.add_argument("--work-root",
default="/tmp/cis490-thezoo-extract",
help="Per-run extraction scratch dir.")
p.add_argument("--dry-run", action="store_true")
args = p.parse_args(argv)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
if shutil.which("git") is None:
log.error("git not on PATH; install git first")
return 2
manifest_path = Path(args.manifest)
store_root = Path(args.store_root)
work_root = Path(args.work_root)
manifest = SampleManifest.load(manifest_path)
thezoo = _ensure_thezoo(Path(args.thezoo_clone_dir))
fetched = 0
skipped = 0
failed = 0
for sample in manifest.samples:
if sample.sha256:
log.info("%s: already real (sha256=%s); skipping",
sample.name, sample.sha256[:12])
skipped += 1
continue
if args.dry_run:
fam = _find_family_dir(thezoo, sample.family)
log.info("%s [dry-run]: family=%s match=%s",
sample.name, sample.family, fam.name if fam else "<none>")
continue
result = fetch_one(thezoo, sample.family, sample.name,
store_root, work_root)
if result is None:
failed += 1
continue
sha, _ = result
url = f"https://github.com/ytisf/theZoo/tree/master/malware/Binaries"
update_manifest_entry(manifest_path, sample.name,
source="theZoo", sha256=sha, url=url)
fetched += 1
log.info("done: fetched=%d skipped=%d failed=%d", fetched, skipped, failed)
# Tier 4 is mandatory — non-zero exit if no real samples staged.
if fetched == 0 and skipped == 0:
log.error("zero samples staged — check theZoo clone + family-name mapping")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -42,6 +42,36 @@ def _sha256_of(path: Path) -> str:
return h.hexdigest()
def _commit_from_tarball(tar_path: Path) -> str | None:
"""Extract meta.json::code_version.commit from a tar.zst without
leaving anything on disk. Returns None on any failure callers
write the row without a commit field."""
import io as _io, subprocess as _sp, tarfile as _tar
try:
z = _sp.check_output(
["zstd", "-q", "-d", "--stdout", str(tar_path)],
stderr=_sp.DEVNULL,
)
except (_sp.CalledProcessError, FileNotFoundError, OSError):
return None
try:
with _tar.open(fileobj=_io.BytesIO(z)) as t:
for m in t.getmembers():
if m.name.endswith("meta.json") and m.isfile():
f = t.extractfile(m)
if f is None:
return None
meta = json.loads(f.read().decode("utf-8"))
cv = meta.get("code_version") or {}
c = cv.get("commit")
if isinstance(c, str) and len(c) == 40:
return c.lower()
return None
except Exception:
return None
return None
def _existing_episode_ids(index_path: Path) -> set[str]:
if not index_path.exists():
return set()
@ -89,7 +119,7 @@ def main(argv: list[str] | None = None) -> int:
continue
sha = _sha256_of(tar)
size = tar.stat().st_size
rows_to_write.append({
row: dict = {
"received_at_wall": datetime.now(timezone.utc).isoformat(),
"host_id": host_dir.name,
"episode_id": episode_id,
@ -97,7 +127,11 @@ def main(argv: list[str] | None = None) -> int:
"size_bytes": size,
"schema_version": SCHEMA_VERSION,
"backfilled": True,
})
}
commit = _commit_from_tarball(tar)
if commit:
row["commit"] = commit
rows_to_write.append(row)
print(f"scanned: {scanned} already-indexed: {scanned - len(rows_to_write)} "
f"to-backfill: {len(rows_to_write)}")

View file

@ -141,6 +141,13 @@ def classify_episode(tar_zst: Path, host_id: str, episode_id: str) -> EpisodeQua
events = _read_jsonl_from_tar(tar, "events.jsonl")
proc = _read_jsonl_from_tar(tar, "telemetry-proc.jsonl")
labels = _read_jsonl_from_tar(tar, "labels.jsonl")
# Optional secondary telemetry sources — used to rescue
# episodes whose /proc CPU% is flat but whose signal lives in
# network bytes (scan-and-dial, bursty-c2, shell-resident),
# disk I/O (io-walk), or guest-side load (low-and-slow).
netflow = _read_jsonl_from_tar(tar, "netflow.jsonl")
qmp_rows = _read_jsonl_from_tar(tar, "telemetry-qmp.jsonl")
guest_rows = _read_jsonl_from_tar(tar, "telemetry-guest.jsonl")
sample = meta.get("sample")
if sample is None:
@ -175,45 +182,165 @@ def classify_episode(tar_zst: Path, host_id: str, episode_id: str) -> EpisodeQua
probe_says_silent = True
break
# flat-cpu: bucket /proc CPU% by phase, check inter-phase spread.
if proc and labels:
clk_tck = os.sysconf("SC_CLK_TCK")
# Multi-signal flatness: an episode is "flat" only if EVERY
# available telemetry source shows no inter-phase variation. A
# bursty network workload (scan-and-dial, bursty-c2) leaves /proc
# nearly idle but spikes netflow bytes — keeping such an episode
# in the dataset is the whole point. Similarly, io-walk's signal
# lives in qmp blockstats (virtio writes), and low-and-slow's
# lives in guest-side load_1m. Each helper returns True if its
# source DOES distinguish phases (i.e. has signal).
if not labels:
# No labels means no phase boundaries to compare across — skip
# the flatness analysis entirely. Episode is uncategorizable
# but not necessarily bad.
return q
def phase_at(t_ns: int) -> str:
cur = "(pre)"
for l in labels:
if l["t_mono_ns"] <= t_ns:
cur = l["phase"]
else:
break
return cur
# Use t_wall_ns rather than t_mono_ns for phase mapping. The host
# /proc collector and labels use orchestrator-relative t_mono_ns,
# but the bridge_pcap netflow rows use wall-clock-like t_mono_ns
# (qemu boot-monotonic seen from outside) — using a single
# numerical t_mono_ns silently buckets every netflow row into
# whichever phase happens to be last. t_wall_ns is consistent
# across sources because every collector stamps it from
# CLOCK_REALTIME at sample time.
def phase_at(row: dict) -> str:
tw = row.get("t_wall_ns")
if tw is None:
return "(pre)"
cur = "(pre)"
for lab in labels:
if lab.get("t_wall_ns", 0) <= tw:
cur = lab["phase"]
else:
break
return cur
per_phase: dict[str, list[float]] = {}
prev = None
for r in proc:
if prev is not None:
dt = (r["t_mono_ns"] - prev["t_mono_ns"]) / 1e9
if dt > 0:
djiff = (r["cpu_user_jiffies"] + r["cpu_sys_jiffies"]) - \
(prev["cpu_user_jiffies"] + prev["cpu_sys_jiffies"])
pct = 100.0 * (djiff / clk_tck) / dt
per_phase.setdefault(phase_at(r["t_mono_ns"]), []).append(pct)
prev = r
if per_phase:
medians = [statistics.median(v) for v in per_phase.values() if v]
if medians and (max(medians) - min(medians)) < 5.0:
q.reasons.append("flat-cpu")
proc_has_signal = _proc_cpu_has_signal(proc, phase_at)
netflow_has_signal = _netflow_has_signal(netflow, phase_at)
qmp_has_signal = _qmp_block_has_signal(qmp_rows, phase_at)
guest_has_signal = _guest_load_has_signal(guest_rows, phase_at)
# `flat-cpu` retains its name (existing reason) but now means "no
# available telemetry source distinguishes phases". `proc_has_signal`
# is None when /proc data is missing entirely — treat that as
# "unknown", not "flat".
sources = {
"proc": proc_has_signal,
"netflow": netflow_has_signal,
"qmp": qmp_has_signal,
"guest": guest_has_signal,
}
available = {k: v for k, v in sources.items() if v is not None}
if available and not any(available.values()):
q.reasons.append("flat-cpu")
# Confirm workload-silent only when host-side telemetry agrees.
# If the probe said silent but /proc CPU% shows a real inter-phase
# delta (i.e. NOT flat-cpu), trust the host-side ground truth and
# discard the probe result — the probe is busybox-pgrep-broken.
# If the probe said silent but ANY source shows real signal, trust
# the host-side ground truth and discard the probe result — the
# probe was busybox-pgrep-broken on Alpine until 2707709.
if probe_says_silent and "flat-cpu" in q.reasons:
q.reasons.append("workload-silent")
return q
# ---------------------------------------------------------------------------
# Per-source signal detection. Each returns:
# True → source has rows AND distinguishes phases (signal present)
# False → source has rows but every phase looks the same (flat)
# None → source is missing or empty (unknown — don't count it)
# ---------------------------------------------------------------------------
def _proc_cpu_has_signal(proc: list[dict], phase_at) -> bool | None:
"""/proc CPU%: median per-phase spread > 5 percentage points."""
if not proc:
return None
clk_tck = os.sysconf("SC_CLK_TCK")
per_phase: dict[str, list[float]] = {}
prev = None
for r in proc:
if prev is not None:
dt = (r["t_mono_ns"] - prev["t_mono_ns"]) / 1e9
if dt > 0:
djiff = (r["cpu_user_jiffies"] + r["cpu_sys_jiffies"]) - \
(prev["cpu_user_jiffies"] + prev["cpu_sys_jiffies"])
pct = 100.0 * (djiff / clk_tck) / dt
per_phase.setdefault(phase_at(r), []).append(pct)
prev = r
if not per_phase:
return None
medians = [statistics.median(v) for v in per_phase.values() if v]
if not medians:
return None
return (max(medians) - min(medians)) >= 5.0
def _netflow_has_signal(netflow: list[dict], phase_at) -> bool | None:
"""netflow bytes: total bytes_in+bytes_out per phase. Signal means
at least one phase has > 50 KiB more total traffic than the
quietest phase. Catches scan-and-dial, bursty-c2, shell-resident."""
if not netflow:
return None
per_phase_bytes: dict[str, int] = {}
for r in netflow:
ph = phase_at(r)
per_phase_bytes[ph] = per_phase_bytes.get(ph, 0) + \
int(r.get("bytes_in", 0)) + int(r.get("bytes_out", 0))
if not per_phase_bytes:
return None
return (max(per_phase_bytes.values()) - min(per_phase_bytes.values())) >= 50 * 1024
def _qmp_block_has_signal(qmp: list[dict], phase_at) -> bool | None:
"""QMP blockstats wr_bytes+rd_bytes per-phase DELTA. blockstats
are cumulative counters; comparing last-values across phases
always shows signal (counters monotonically increase). The
correct metric is bytes-written-DURING-each-phase: subtract
each phase's first sample from its last sample, then check
inter-phase spread. > 100 KiB delta in any phase vs another
means real disk activity concentrated there. Catches io-walk."""
if not qmp:
return None
per_phase_first: dict[str, int] = {}
per_phase_last: dict[str, int] = {}
for r in qmp:
bs = r.get("blockstats") or {}
total = 0
for dev, stats in bs.items():
if isinstance(stats, dict):
total += int(stats.get("wr_bytes", 0)) + int(stats.get("rd_bytes", 0))
ph = phase_at(r)
if ph not in per_phase_first:
per_phase_first[ph] = total
per_phase_last[ph] = total
deltas = [per_phase_last[p] - per_phase_first[p] for p in per_phase_last]
if len(deltas) < 2:
return None
return (max(deltas) - min(deltas)) >= 100 * 1024
def _guest_load_has_signal(guest: list[dict], phase_at) -> bool | None:
"""Guest agent load_1m: phase-medians spread > 0.10. Catches
low-and-slow (memory churn shows up as load even with idle /proc),
and any host where the guest agent is alive."""
if not guest:
return None
per_phase: dict[str, list[float]] = {}
for r in guest:
load = r.get("load_1m_5m_15m")
if not (isinstance(load, list) and load):
continue
per_phase.setdefault(phase_at(r), []).append(float(load[0]))
if not per_phase:
return None
medians = [statistics.median(v) for v in per_phase.values() if v]
if len(medians) < 2:
return None
return (max(medians) - min(medians)) >= 0.10
# ---------------------------------------------------------------------------
# Index walking + actions
# ---------------------------------------------------------------------------

417
tools/verify_tier3_local.py Normal file
View file

@ -0,0 +1,417 @@
"""``cis490-verify-tier3-local`` — verify Tier-3/4 components without
needing x86 KVM.
The Pi is ARM64 it can't boot Metasploitable2 with hardware
acceleration. But most of the Tier-3 chain doesn't need x86 at all:
* msfrpcd is a Ruby daemon and runs natively on ARM
* the chunked binary upload is just shell commands over a pipe
* module configs, sample selection, manifest rewriting are pure Python
This script exercises every component that DOES work on the Pi
end-to-end. What it cannot verify on the Pi is the actual exploit
fire against an x86 vulnerable target that lives in the
``install-tier-3-4.sh`` verify step on a real lab host.
Run on the Pi (or any host):
/opt/cis490/.venv/bin/python tools/verify_tier3_local.py
Exit codes:
0 every step passed
1 at least one MUST step failed
2 prerequisites missing (msfrpcd not installed; that's OK, those
tests skip)
Each step prints a one-line PASS/FAIL/SKIP summary, exit-on-first-fail
is OFF so you see all problems at once.
"""
from __future__ import annotations
import argparse
import hashlib
import io
import json
import logging
import os
import socket
import subprocess
import sys
import tarfile
import time
from pathlib import Path
from typing import Callable
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from exploits.workloads import chunked_real_binary_upload, _wrap_loop # noqa: E402,F401
from exploits.modules import load_module_configs, select_module # noqa: E402
from samples.manifest import SampleManifest # noqa: E402
log = logging.getLogger("cis490.verify_tier3_local")
# ---------------------------------------------------------------------------
# Pretty-printer
# ---------------------------------------------------------------------------
_GREEN, _YELLOW, _RED, _RESET = "\033[32m", "\033[33m", "\033[31m", "\033[0m"
class Step:
def __init__(self, name: str) -> None:
self.name = name
self.outcome: str | None = None
self.detail: str = ""
def passed(self, detail: str = "") -> None:
self.outcome = "PASS"
self.detail = detail
def failed(self, detail: str) -> None:
self.outcome = "FAIL"
self.detail = detail
def skipped(self, detail: str) -> None:
self.outcome = "SKIP"
self.detail = detail
def emit(self) -> None:
c = {"PASS": _GREEN, "FAIL": _RED, "SKIP": _YELLOW}.get(self.outcome or "", "")
print(f" {c}{self.outcome:<5}{_RESET} {self.name}")
if self.detail:
for line in self.detail.splitlines():
print(f" {line}")
# ---------------------------------------------------------------------------
# Step 1 — chunked upload via local /bin/sh subprocess
#
# This is the most important on-Pi test. It proves the entire chunked-
# binary-upload chain works against a real shell, not a mock — every
# byte of base64, the heredoc-free printf chain, the sha256 verify,
# the chmod, the nohup-exec scaffold. If THIS works, the only
# remaining unknown for a Tier-4 ship is "does Metasploit's session
# accept the same shell commands" — and Metasploit shell sessions
# present a POSIX shell interface by design.
# ---------------------------------------------------------------------------
def step_chunked_upload_via_local_sh(work_dir: Path) -> Step:
s = Step("chunked binary upload survives a real /bin/sh round-trip")
binary_bytes = os.urandom(150_000) # 150 KB — exercises ~30 chunks
expected_sha = hashlib.sha256(binary_bytes).hexdigest()
plan = chunked_real_binary_upload(binary_bytes)
# Run /bin/sh with the work_dir as its CWD; rewrite /tmp paths in
# the plan to point inside work_dir so we don't litter the host.
bin_path = work_dir / "uploaded.bin"
pid_path = work_dir / "uploaded.pid"
b64_path = work_dir / "uploaded.b64"
rewritten = []
for c in plan.chunks:
rewritten.append(c.replace(plan.bin_path, str(bin_path))
.replace(plan.pid_path, str(pid_path))
.replace(plan.b64_path if hasattr(plan, "b64_path") else "/tmp/", str(b64_path)))
# The plan internally references /tmp/.cis490-real-<profile>.b64;
# the simpler way is to grep+replace any bare /tmp/.cis490-real-real-binary
# path with a path under work_dir. We do it via the public fields.
rewritten = [c.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path))
.replace("/tmp/.cis490-real-real-binary.bin", str(bin_path))
.replace("/tmp/.cis490-real-real-binary.pid", str(pid_path))
for c in plan.chunks]
finalize = plan.finalize_cmd.replace("/tmp/.cis490-real-real-binary.b64", str(b64_path)) \
.replace("/tmp/.cis490-real-real-binary.bin", str(bin_path)) \
.replace("/tmp/.cis490-real-real-binary.pid", str(pid_path))
# Stream chunks into /bin/sh.
proc = subprocess.Popen(
["/bin/sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True,
)
assert proc.stdin and proc.stdout
try:
for c in rewritten:
proc.stdin.write(c + "\n")
proc.stdin.write(finalize + "\n")
proc.stdin.write("echo __CIS490_FINALIZED__\n")
proc.stdin.flush()
# Wait for the marker.
out_lines: list[str] = []
deadline = time.monotonic() + 30.0
while time.monotonic() < deadline:
line = proc.stdout.readline()
if not line:
break
out_lines.append(line.rstrip())
if "__CIS490_FINALIZED__" in line:
break
else:
s.failed("timed out waiting for finalize marker; sh output:\n"
+ "\n".join(out_lines[-10:]))
return s
finally:
try:
proc.stdin.close()
proc.wait(timeout=2)
except Exception:
proc.kill()
if not bin_path.exists():
s.failed(f"binary not produced at {bin_path}; sh output tail:\n"
+ "\n".join(out_lines[-10:]))
return s
actual_sha = hashlib.sha256(bin_path.read_bytes()).hexdigest()
if actual_sha != expected_sha:
s.failed(f"sha256 mismatch: expected {expected_sha[:12]}, got "
f"{actual_sha[:12]} ({bin_path.stat().st_size} bytes)")
return s
if "sha-ok" not in "\n".join(out_lines):
s.failed("finalize_cmd did not print 'sha-ok'; sh output tail:\n"
+ "\n".join(out_lines[-10:]))
return s
s.passed(f"{plan.n_chunks} chunks, {len(binary_bytes):,} bytes, "
f"sha256={expected_sha[:12]}")
return s
# ---------------------------------------------------------------------------
# Step 2 — every exploit module's TOML parses + selector picks one
# ---------------------------------------------------------------------------
def step_module_configs_parse() -> Step:
s = Step("all exploits/modules/*.toml parse + selector returns a valid choice")
modules_dir = REPO_ROOT / "exploits" / "modules"
try:
catalog = load_module_configs(modules_dir)
except Exception as e:
s.failed(f"{modules_dir}: {e}")
return s
if not catalog:
s.failed(f"no TOML files in {modules_dir}")
return s
pick1 = select_module(catalog, host_id="lab1", slot=0, episode_index=0)
pick2 = select_module(catalog, host_id="lab1", slot=0, episode_index=0)
if pick1.name != pick2.name:
s.failed(f"selector non-deterministic: {pick1.name} vs {pick2.name}")
return s
s.passed(f"{len(catalog)} modules parsed; selector → {pick1.name}")
return s
# ---------------------------------------------------------------------------
# Step 3 — staged samples are the right architecture
# ---------------------------------------------------------------------------
def _is_linux_i386_elf(p: Path) -> bool:
try:
head = p.open("rb").read(20)
except OSError:
return False
if len(head) < 20 or head[:4] != b"\x7fELF":
return False
return (head[4] == 1 and head[5] == 1 and head[7] in (0, 3)
and int.from_bytes(head[18:20], "little") == 0x03)
def step_staged_samples_have_correct_arch(store_root: Path) -> Step:
s = Step("staged samples are Linux i386 ELF (executable on Metasploitable2)")
if not store_root.exists():
s.skipped(f"{store_root} doesn't exist — run install-tier-3-4.sh first")
return s
# Only count sha256-named real samples — ignore .gitkeep and other
# housekeeping files. Real samples are 64-hex names (sha256 of the
# binary).
bins = sorted(p for p in store_root.iterdir()
if p.is_file() and len(p.name) == 64
and all(c in "0123456789abcdef" for c in p.name))
if not bins:
s.skipped(f"no sha256-named samples in {store_root} — run "
f"install-tier-3-4.sh on a lab host (or auto_fetch_samples.py "
f"locally) to populate")
return s
i386 = [p for p in bins if _is_linux_i386_elf(p)]
if not i386:
s.failed(f"{len(bins)} samples staged, none are Linux i386 ELF")
return s
if len(i386) < len(bins):
s.passed(f"{len(i386)}/{len(bins)} samples are Linux i386 ELF "
f"(others are best-effort fallback for theZoo dirs without i386 binaries)")
return s
s.passed(f"{len(bins)}/{len(bins)} samples are Linux i386 ELF")
return s
# ---------------------------------------------------------------------------
# Step 4 — manifest loads + sample selector spreads across families
# ---------------------------------------------------------------------------
def step_manifest_distribution() -> Step:
s = Step("manifest loads + selector covers every sample within 50 episodes")
manifest = SampleManifest.load(REPO_ROOT / "samples" / "manifest.toml")
seen: set[str] = set()
for slot in range(8):
for ep in range(50):
chosen = manifest.select(host_id="probe", slot=slot, episode_index=ep)
seen.add(chosen.name)
if len(seen) == len(manifest.samples):
s.passed(f"all {len(manifest.samples)} samples reachable; "
f"covered after {slot}*{ep}={slot * 50 + ep} picks")
return s
missing = [m.name for m in manifest.samples if m.name not in seen]
s.failed(f"after 8*50=400 picks, never selected: {missing}")
return s
# ---------------------------------------------------------------------------
# Step 5 — msfrpcd live probe (skips if not installed)
# ---------------------------------------------------------------------------
def step_msfrpcd_live_probe() -> Step:
s = Step("msfrpcd reachable on 127.0.0.1:55553 (skip if not installed)")
try:
with socket.create_connection(("127.0.0.1", 55553), timeout=1.0):
pass
except OSError:
s.skipped("msfrpcd not listening — install-msfrpcd.sh hasn't been run "
"on this host (expected for a Pi-only verifier)")
return s
pw_file = Path("/etc/cis490/msfrpc.env")
if not pw_file.exists():
s.failed("port open but /etc/cis490/msfrpc.env missing — partial install")
return s
pw = ""
for line in pw_file.read_text().splitlines():
if line.startswith("MSFRPC_PASSWORD="):
pw = line.split("=", 1)[1]
break
if not pw:
s.failed(f"no MSFRPC_PASSWORD in {pw_file}")
return s
# Round-trip an auth.login + core.version via the production client.
from exploits.msfrpc import MSFRpcClient, MSFRpcConfig
try:
c = MSFRpcClient(MSFRpcConfig(
host="127.0.0.1", port=55553, user="msf", password=pw,
))
ver = c.call("core.version", [])
except Exception as e:
s.failed(f"msfrpc round-trip failed: {e}")
return s
s.passed(f"core.version → {ver.get('version', '?')} (modules ready)")
return s
# ---------------------------------------------------------------------------
# Step 6 — receiver gate path: rebuild a known good commit
# ---------------------------------------------------------------------------
def step_receiver_gate_responds() -> Step:
s = Step("receiver /v1/health responds + gate enforces commit allow-list")
try:
with socket.create_connection(("127.0.0.1", 8444), timeout=1.0):
pass
except OSError:
s.skipped("cis490-receiver not running on this host")
return s
import urllib.request
try:
with urllib.request.urlopen("http://127.0.0.1:8444/v1/health", timeout=2) as r:
if r.status != 200:
s.failed(f"/v1/health returned {r.status}")
return s
except Exception as e:
s.failed(f"/v1/health unreachable: {e}")
return s
# Send a PUT with bogus commit; expect 412.
req = urllib.request.Request(
"http://127.0.0.1:8444/v1/episodes/probe/01TESTNULL.tar.zst",
method="PUT", data=b"",
headers={
"X-Content-SHA256": "0" * 64,
"X-Lab-Host": "probe",
"X-Cis490-Code-Commit": "0" * 40,
},
)
try:
urllib.request.urlopen(req, timeout=2)
s.failed("receiver accepted a bogus commit — gate is OFF")
except urllib.error.HTTPError as e:
if e.code == 412:
body = json.loads(e.read())
head = body.get("head_commit", "?")
s.passed(f"gate rejected bogus commit; head={head[:12] if head else '?'}, "
f"window_size={body.get('valid_window_size')}")
else:
s.failed(f"unexpected status {e.code}: {e.read().decode()[:200]}")
return s
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def main() -> int:
p = argparse.ArgumentParser(prog="cis490-verify-tier3-local")
p.add_argument("--store-root",
default=str(REPO_ROOT / "samples" / "store"),
help="Where staged real samples live")
p.add_argument("--work-dir",
default="/tmp/cis490-verify-tier3",
help="Scratch dir for the chunked-upload test")
args = p.parse_args()
logging.basicConfig(level=logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s %(message)s")
work = Path(args.work_dir)
work.mkdir(parents=True, exist_ok=True)
print("CIS490 Tier-3 local verifier — exercises every component that")
print("CAN run on this host without x86 KVM.")
print()
steps: list[Step] = []
steps.append(step_module_configs_parse())
steps.append(step_manifest_distribution())
steps.append(step_chunked_upload_via_local_sh(work))
steps.append(step_staged_samples_have_correct_arch(Path(args.store_root)))
steps.append(step_msfrpcd_live_probe())
steps.append(step_receiver_gate_responds())
for s in steps:
s.emit()
failed = [s for s in steps if s.outcome == "FAIL"]
skipped = [s for s in steps if s.outcome == "SKIP"]
passed = [s for s in steps if s.outcome == "PASS"]
print()
print(f"summary: {len(passed)} passed, {len(failed)} failed, "
f"{len(skipped)} skipped (of {len(steps)} steps)")
if failed:
print()
print(f"{_RED}NOT all components verified.{_RESET} Tier-3 deploy will "
f"fail at the live-fire step on a lab host.")
return 1
if skipped:
print(f"{_YELLOW}Some steps skipped (expected for a Pi-only verifier).{_RESET}")
print("Run on a lab host after install-tier-3-4.sh for full coverage.")
return 0
if __name__ == "__main__":
sys.exit(main())