Last active
May 23, 2020 01:06
-
-
Save hexabeast/6cee6eb33ccec3697abd45331b2d0cce to your computer and use it in GitHub Desktop.
TLS socket interception, compatible with HTTP2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pip3 install certauth | |
#certauth myrootca.pem --certname "My Test CA" | |
#For http2 : mkdir streams | |
#Get website real IP then put website.com as localhost in /etc/hosts | |
#python3 portfwd_SSL_all.py 443 website.com 443 WEBSITE_REAL_IP | |
import socket | |
import ssl | |
import sys | |
import threading | |
import time | |
import sys | |
import os | |
import struct | |
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) | |
http2 = False | |
if http2: | |
context.set_alpn_protocols(["h2","http/1.1"]) | |
def main(): | |
settings = list(sys.argv)[1:] | |
settings[0] = int(settings[0]) | |
settings[2] = int(settings[2]) | |
os.system(f'certauth myrootca.pem --hostname "{settings[1]}" -d ./certs_dir') | |
threading.Thread(target=server, args=tuple(settings)).start() | |
# wait for <ctrl-c> | |
while True: | |
time.sleep(60) | |
def server(bindport,hostname,destport,realip): | |
try: | |
dock_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
dock_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
dock_socket.bind(('', bindport)) | |
dock_socket.listen(5) | |
while True: | |
print("soket") | |
client_socket = dock_socket.accept()[0] | |
scontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) | |
if http2: | |
scontext.set_alpn_protocols(["h2","http/1.1"]) | |
scontext.load_cert_chain(certfile=f"./certs_dir/{hostname}.pem", keyfile=f"./certs_dir/{hostname}.pem") | |
ssl_client_socket = scontext.wrap_socket(client_socket, server_side=True) | |
print(ssl_client_socket.version()) | |
server_socket = socket.create_connection((realip, destport)) | |
print("soket2") | |
ssl_server_socket = context.wrap_socket(server_socket, server_hostname=hostname) | |
print("soket3") | |
threading.Thread(target=forward, args=(ssl_client_socket, ssl_server_socket)).start() | |
threading.Thread(target=forward, args=(ssl_server_socket, ssl_client_socket)).start() | |
finally: | |
pass | |
#threading.Thread(target=server, args=(bindport,hostname,destport)).start() | |
streams = set() | |
def forward(source, destination): | |
string = ' ' | |
while string: | |
string = source.recv(1000000) | |
f = None | |
if http2: | |
try: | |
string1 = string[:3] | |
siz = struct.unpack(">I", b"\x00"+string1)[0] | |
print("SIZ",siz) | |
print(string[:3]) | |
string2 = string[4:5] | |
typ = string2 | |
string3 = string[5:6] | |
flags = string3 | |
string4 = string[6:10] | |
stream_id = struct.unpack(">I", string4)[0] | |
print("stream_id",stream_id) | |
string5 = string[10:10+siz] | |
payload = string5 | |
if not stream_id in streams: | |
f = open(f"streams/stream{stream_id}","wb") | |
streams.add(stream_id) | |
else: | |
f = open(f"streams/stream{stream_id}","ab") | |
streams.add(stream_id) | |
except: | |
print("not http2 :(") | |
if string: | |
if f: | |
f.write(payload) | |
print(string) | |
destination.sendall(string) | |
else: | |
try: | |
source.shutdown(socket.SHUT_RD) | |
destination.shutdown(socket.SHUT_WR) | |
except: | |
pass | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment