"""Phase-driven load mimic — XMRig-shaped envelope without real malware. Reads phase commands on stdin (one per line). Adjusts the work loop's behavior so the host /proc telemetry of THIS process produces a recognizable envelope across phases: clean — idle: long sleeps, ~0% CPU armed — short CPU bursts (key exchange shape) infecting — CPU burst + transient disk writes (sample drop) infected_running — sustained CPU burn + periodic small writes dormant — quieter than clean: long sleeps simulating beacon loss Used by ``tools/run_envelope_demo.py`` to validate the orchestrator and collector pipeline against a known phase schedule, before a real VM is involved. Real telemetry, simulated load. """ from __future__ import annotations import os import sys import tempfile import threading import time VALID_PHASES = {"clean", "armed", "infecting", "infected_running", "dormant"} def _consume_cpu_ms(ms: float) -> None: end = time.monotonic() + ms / 1000.0 x = 0 while time.monotonic() < end: # A few thousand ops; the budget is enforced by the deadline. for _ in range(1000): x = (x * x + 1) % 1_000_000_007 class LoadMimic: def __init__(self, scratch_path: str) -> None: self._phase = "clean" self._stop = threading.Event() self._scratch = scratch_path self._stratum_counter = 0 self._last_stratum = time.monotonic() self._thread = threading.Thread(target=self._loop, daemon=True, name="load") self._thread.start() def set_phase(self, p: str) -> None: self._phase = p def stop(self) -> None: self._stop.set() self._thread.join(timeout=2.0) def _loop(self) -> None: while not self._stop.is_set(): phase = self._phase if phase == "clean": # ~1% CPU: short tick, mostly sleeping. _consume_cpu_ms(1) time.sleep(0.1) elif phase == "dormant": # Quieter than clean; longer sleeps emulate beacon loss. time.sleep(0.25) elif phase == "armed": # Light CPU + a tiny write (TLS-handshake-ish shape). _consume_cpu_ms(15) with open(self._scratch, "ab") as f: f.write(b"armed:" + os.urandom(64) + b"\n") time.sleep(0.05) elif phase == "infecting": # Disk burst (sample landing) + medium CPU. with open(self._scratch, "ab") as f: f.write(os.urandom(8192)) _consume_cpu_ms(40) time.sleep(0.02) elif phase == "infected_running": # Heavy CPU; near saturation on one core. _consume_cpu_ms(95) # Periodic stratum-like small write every ~3s. now = time.monotonic() if now - self._last_stratum > 3.0: with open(self._scratch, "ab") as f: f.write(b"stratum:" + os.urandom(96) + b"\n") self._last_stratum = now self._stratum_counter += 1 time.sleep(0.005) else: # Unknown phase — be safe, idle. time.sleep(0.1) def main() -> int: # Use a per-pid scratch file under /tmp so concurrent runs don't collide. scratch = os.path.join(tempfile.gettempdir(), f"cis490-load-{os.getpid()}.bin") open(scratch, "wb").close() load = LoadMimic(scratch_path=scratch) sys.stdout.write(f"load_mimic pid={os.getpid()} scratch={scratch}\n") sys.stdout.flush() try: for line in sys.stdin: cmd = line.strip() if cmd in VALID_PHASES: load.set_phase(cmd) sys.stdout.write(f"phase={cmd}\n") sys.stdout.flush() elif cmd in ("quit", "exit"): break elif cmd == "": continue else: sys.stdout.write(f"unknown: {cmd}\n") sys.stdout.flush() finally: load.stop() try: os.unlink(scratch) except FileNotFoundError: pass return 0 if __name__ == "__main__": sys.exit(main())