"""Per-sample-profile post-exploit workloads (driver v2). The Tier-3 driver lands a session and then needs to drive *something* in that session for the ``infected_running`` phase. Driver v1 ran ``yes > /dev/null`` for every sample, which is fine for proving the pipe but is the wrong shape for ML — every Tier-3 episode produces the same envelope regardless of which malware family we said it was. Driver v2 maps ``sample.profile`` from the manifest to a distinct in-session workload so each profile's envelope is observably different on every collector: cpu-saturate → 1-vCPU saturation, very low IO/net (XMRig shape) scan-and-dial → SYN scans across the bridge IP space + periodic dial-home (Mirai shape) io-walk → fs traversal + random write spikes (ransomware shape) bursty-c2 → long idle, periodic short TCP egress bursts (Dridex) low-and-slow → minimal CPU, periodic memory churn (Kovter) shell-resident → one long-lived TCP socket pinned to a bridge IP, occasional small command bursts (RAT) Each profile returns a small shell command that backgrounds a loop inside the session. The driver can stop them by killing the loop's PID file or via a profile-specific kill command. This module is intentionally *behaviorally diverse but harmless* — it does NOT execute real malware. Real binaries land via the Tier-4 fetch+run path (separate work). What this gives us today is six distinguishable in-guest envelopes the ML model can learn to discriminate between *and* fall back to when a real sample isn't yet staged. """ from __future__ import annotations import logging from dataclasses import dataclass from samples.manifest import Sample log = logging.getLogger("cis490.exploits.workloads") @dataclass(frozen=True) class Workload: """A pair of shell commands executable in a Metasploit shell session. ``start_cmd`` backgrounds a loop and writes its PID to ``pid_path``. ``stop_cmd`` kills the loop using that PID file. Both commands are expected to be POSIX-shell compatible and to leave the session in a usable state on completion (return code 0 on the prompt).""" profile: str start_cmd: str stop_cmd: str description: str @property def pid_path(self) -> str: return f"/tmp/.cis490-workload-{self.profile}.pid" def _wrap_loop(name: str, body: str) -> Workload: """Common pattern: write a small wrapper script that loops ``body``, background it, and stash the wrapper's PID. Stop kills that PID + its child group.""" pid_path = f"/tmp/.cis490-workload-{name}.pid" script_path = f"/tmp/.cis490-workload-{name}.sh" # Triple-quote the body into a heredoc so single-quotes inside the # body don't conflict with our outer single-quoting. # No `disown` here: it isn't a builtin in busybox sh / ash, so on # Alpine guests the `disown` line printed `sh: disown: not found` # into the captured output of every infected_running phase. nohup # already gives SIGHUP immunity, which is the only thing disown # was for. See spectral/CIS490#15. start = ( f"cat > {script_path} <<'CIS490_EOF'\n" f"#!/bin/sh\n" f"trap 'exit 0' TERM INT\n" f"while :; do\n" f"{body}\n" f"done\n" f"CIS490_EOF\n" f"chmod +x {script_path}; " f"nohup sh {script_path} /dev/null 2>&1 &\n" f"echo $! > {pid_path}\n" ) stop = ( f"if [ -f {pid_path} ]; then " f" kill -- -$(cat {pid_path}) 2>/dev/null; " f" kill $(cat {pid_path}) 2>/dev/null; " f" rm -f {pid_path} {script_path}; " f"fi; true\n" ) return Workload(profile=name, start_cmd=start, stop_cmd=stop, description="(generated)") # --------------------------------------------------------------------------- # Profile factories — each returns a Workload tuned to that family # --------------------------------------------------------------------------- def _cpu_saturate() -> Workload: """XMRig-class — sustained single-vCPU saturation, no IO, no net.""" body = " yes > /dev/null 2>&1 &\n wait $!\n" w = _wrap_loop("cpu-saturate", body) return Workload( profile="cpu-saturate", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="100% CPU on 1 vCPU; no IO, no net", ) def _scan_and_dial() -> Workload: """Mirai-class — TCP SYN-style probe of bridge subnet + occasional "dial home" to the gateway. Heavy net, moderate CPU. Uses ``nc`` (netcat) instead of bash's /dev/tcp redirects — the latter is bash-only and silently no-ops on busybox / dash, which is what Metasploitable2 and Alpine guest sessions actually run. Falls back to a TCP-via-python one-liner if nc isn't available.""" body = ( " for i in 1 2 3 4 5 6 7 8 9 10; do\n" " nc -z -w 1 10.200.0.$((i+1)) 23 >/dev/null 2>&1 &\n" " nc -z -w 1 10.200.0.$((i+1)) 2323 >/dev/null 2>&1 &\n" " done\n" " wait\n" " echo dial-home | nc -w 1 10.200.0.1 4444 >/dev/null 2>&1\n" " sleep 2\n" ) w = _wrap_loop("scan-and-dial", body) return Workload( profile="scan-and-dial", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="Periodic SYN-style scan across bridge IPs + dial-home", ) def _io_walk() -> Workload: """Cryptolocker-class — fs traversal + write spikes. Heavy disk.""" body = ( " mkdir -p /tmp/.cis490-victim\n" " for n in 1 2 3 4 5 6 7 8; do\n" " dd if=/dev/urandom of=/tmp/.cis490-victim/f$n bs=4k count=64 2>/dev/null\n" " done\n" " for f in /tmp/.cis490-victim/*; do cat $f > /dev/null; done\n" " sleep 1\n" ) w = _wrap_loop("io-walk", body) return Workload( profile="io-walk", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="FS traversal + random-data writes, periodic re-read", ) def _bursty_c2() -> Workload: """Dridex-class — long idle, periodic small TCP burst to a fixed peer (the bridge gateway). nc-based for busybox compatibility.""" body = ( " sleep 25\n" " for i in 1 2 3; do\n" " echo c2-beacon-$$-$i | nc -w 1 10.200.0.1 4445 >/dev/null 2>&1\n" " sleep 1\n" " done\n" ) w = _wrap_loop("bursty-c2", body) return Workload( profile="bursty-c2", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="Long idle + periodic 3-packet egress burst to gateway", ) def _low_and_slow() -> Workload: """Kovter-class — low CPU, periodic memory churn, no on-disk artifact. The hardest envelope to label from /proc alone.""" body = ( " sleep 8\n" " awk 'BEGIN { for(i=0;i<200000;i++) a[i]=i*i; }' >/dev/null 2>&1\n" " sleep 4\n" ) w = _wrap_loop("low-and-slow", body) return Workload( profile="low-and-slow", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="Periodic memory churn (~200k array allocs) on a slow cycle", ) def _shell_resident() -> Workload: """RAT-style — keep a single TCP connection open to the gateway with occasional command bursts. Long-lived flow, small bytes. Uses ``nc -w`` on the busybox-compatible path. We pipe a slow feed into nc so the connection stays open for ~30 s before the -w idle timeout closes it, matching the long-lived-flow shape. Then we sleep + reconnect, producing the periodic-tick pattern.""" body = ( " ( for i in 1 2 3 4 5 6; do\n" " echo cmd-tick-$i\n" " sleep 5\n" " done ) | nc -w 30 10.200.0.1 4446 >/dev/null 2>&1\n" " sleep 5\n" ) w = _wrap_loop("shell-resident", body) return Workload( profile="shell-resident", start_cmd=w.start_cmd, stop_cmd=w.stop_cmd, description="Resident TCP connection to gateway with periodic ticks", ) _FACTORIES = { "cpu-saturate": _cpu_saturate, "scan-and-dial": _scan_and_dial, "io-walk": _io_walk, "bursty-c2": _bursty_c2, "low-and-slow": _low_and_slow, "shell-resident": _shell_resident, } def workload_for(sample: Sample | None) -> Workload | None: """Return the Workload matching ``sample.profile``, or None when no sample is supplied (driver v1 fallback path).""" if sample is None: return None factory = _FACTORIES.get(sample.profile) if factory is None: log.warning("no workload profile for %r; falling back to cpu-saturate", sample.profile) return _cpu_saturate() return factory() def all_profiles() -> list[str]: return sorted(_FACTORIES.keys()) # --------------------------------------------------------------------------- # Tier-4 path: real-binary upload + execute inside the shell session # --------------------------------------------------------------------------- @dataclass(frozen=True) class ChunkedUpload: """Multi-step upload plan. Each chunk is one ``shell_write`` call; the driver issues them in order, then a final integrity check, then the exec command. The last command runs the binary and writes its PID to ``pid_path``.""" profile: str chunks: tuple[str, ...] # each is a complete shell command finalize_cmd: str # decode + verify sha256 + chmod exec_cmd: str # actually launch the binary stop_cmd: str bin_path: str pid_path: str expected_sha256: str n_chunks: int # Conservative chunk size: msfrpc shell_write payloads are reliable # under ~16 KiB (single TCP write inside the framework). Use 8 KiB of # *base64* (which is 6 KiB of binary) per chunk so we leave room for # the wrapper and stay well under the limit. _CHUNK_B64_BYTES = 8 * 1024 def chunked_real_binary_upload( binary_bytes: bytes, sample: Sample | None = None, ) -> ChunkedUpload: """Plan a chunked upload of ``binary_bytes`` into a shell session. First chunk creates an empty file; subsequent chunks append a base64 segment. ``finalize_cmd`` decodes + sha256-verifies the result; ``exec_cmd`` launches the binary and stashes its PID. The driver issues these as separate shell_writes so we never push more than ~10 KiB through msfrpc in a single call.""" import base64 as _b64 import hashlib as _hashlib profile = (sample.profile if sample else "real-binary") pid_path = f"/tmp/.cis490-real-{profile}.pid" bin_path = f"/tmp/.cis490-real-{profile}.bin" b64_path = f"/tmp/.cis490-real-{profile}.b64" sha = _hashlib.sha256(binary_bytes).hexdigest() encoded = _b64.b64encode(binary_bytes).decode("ascii") chunks: list[str] = [] chunks.append(f"mkdir -p /tmp; : > {b64_path}; echo upload-begin") for i in range(0, len(encoded), _CHUNK_B64_BYTES): seg = encoded[i:i + _CHUNK_B64_BYTES] # printf '%s' avoids interpreting '%' / '\\' inside the b64 chars. chunks.append(f"printf '%s' '{seg}' >> {b64_path}") finalize = ( f"base64 -d {b64_path} > {bin_path} && rm -f {b64_path} && " f"chmod +x {bin_path} && " f"GOT=$(sha256sum {bin_path} | awk '{{print $1}}') && " f"if [ \"$GOT\" = \"{sha}\" ]; then echo sha-ok; " f"else echo sha-mismatch:$GOT; rm -f {bin_path}; false; fi" ) exec_cmd = ( f"nohup {bin_path} /dev/null 2>&1 & " f"echo $! > {pid_path}; disown; echo exec-ok" ) stop = ( f"if [ -f {pid_path} ]; then " f" kill -- -$(cat {pid_path}) 2>/dev/null; " f" kill $(cat {pid_path}) 2>/dev/null; " f" rm -f {pid_path} {bin_path}; " f"fi; true" ) return ChunkedUpload( profile=f"real:{profile}", chunks=tuple(chunks), finalize_cmd=finalize, exec_cmd=exec_cmd, stop_cmd=stop, bin_path=bin_path, pid_path=pid_path, expected_sha256=sha, n_chunks=len(chunks), ) def real_binary_workload(binary_bytes: bytes, sample: Sample | None = None) -> Workload: """Backwards-compat wrapper that produces a single-shot Workload by concatenating a chunked plan into one start_cmd. Kept for callers that drive the v1 single-shell-write flow (e.g. tests). Production path: the driver should call ``chunked_real_binary_upload`` and walk the chunks itself so msfrpc never sees a buffer-busting payload.""" plan = chunked_real_binary_upload(binary_bytes, sample=sample) start = "\n".join(list(plan.chunks) + [plan.finalize_cmd, plan.exec_cmd]) + "\n" return Workload( profile=plan.profile, start_cmd=start, stop_cmd=plan.stop_cmd, description=f"Real binary upload+execute ({len(binary_bytes)} bytes, {plan.n_chunks} chunks)", )