-
Star
(109)
You must be signed in to star a gist -
Fork
(41)
You must be signed in to fork a gist
-
-
Save pklaus/b5a7876d4d2cf7271873 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| """ | |
| LICENSE http://www.apache.org/licenses/LICENSE-2.0 | |
| """ | |
| import argparse | |
| import datetime | |
| import sys | |
| import time | |
| import threading | |
| import traceback | |
| import socketserver | |
| import struct | |
| try: | |
| from dnslib import * | |
| except ImportError: | |
| print("Missing dependency dnslib: <https://pypi.python.org/pypi/dnslib>. Please install it with `pip`.") | |
| sys.exit(2) | |
| class DomainName(str): | |
| def __getattr__(self, item): | |
| return DomainName(item + '.' + self) | |
| D = DomainName('example.com.') | |
| IP = '127.0.0.1' | |
| TTL = 60 * 5 | |
| soa_record = SOA( | |
| mname=D.ns1, # primary name server | |
| rname=D.andrei, # email of the domain administrator | |
| times=( | |
| 201307231, # serial number | |
| 60 * 60 * 1, # refresh | |
| 60 * 60 * 3, # retry | |
| 60 * 60 * 24, # expire | |
| 60 * 60 * 1, # minimum | |
| ) | |
| ) | |
| ns_records = [NS(D.ns1), NS(D.ns2)] | |
| records = { | |
| D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records, | |
| D.ns1: [A(IP)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3) | |
| D.ns2: [A(IP)], | |
| D.mail: [A(IP)], | |
| D.andrei: [CNAME(D)], | |
| } | |
| def dns_response(data): | |
| request = DNSRecord.parse(data) | |
| print(request) | |
| reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q) | |
| qname = request.q.qname | |
| qn = str(qname) | |
| qtype = request.q.qtype | |
| qt = QTYPE[qtype] | |
| if qn == D or qn.endswith('.' + D): | |
| for name, rrs in records.items(): | |
| if name == qn: | |
| for rdata in rrs: | |
| rqt = rdata.__class__.__name__ | |
| if qt in ['*', rqt]: | |
| reply.add_answer(RR(rname=qname, rtype=getattr(QTYPE, rqt), rclass=1, ttl=TTL, rdata=rdata)) | |
| for rdata in ns_records: | |
| reply.add_ar(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata)) | |
| reply.add_auth(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record)) | |
| print("---- Reply:\n", reply) | |
| return reply.pack() | |
| class BaseRequestHandler(socketserver.BaseRequestHandler): | |
| def get_data(self): | |
| raise NotImplementedError | |
| def send_data(self, data): | |
| raise NotImplementedError | |
| def handle(self): | |
| now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f') | |
| print("\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0], | |
| self.client_address[1])) | |
| try: | |
| data = self.get_data() | |
| print(len(data), data) # repr(data).replace('\\x', '')[1:-1] | |
| self.send_data(dns_response(data)) | |
| except Exception: | |
| traceback.print_exc(file=sys.stderr) | |
| class TCPRequestHandler(BaseRequestHandler): | |
| def get_data(self): | |
| data = self.request.recv(8192).strip() | |
| sz = struct.unpack('>H', data[:2])[0] | |
| if sz < len(data) - 2: | |
| raise Exception("Wrong size of TCP packet") | |
| elif sz > len(data) - 2: | |
| raise Exception("Too big TCP packet") | |
| return data[2:] | |
| def send_data(self, data): | |
| sz = struct.pack('>H', len(data)) | |
| return self.request.sendall(sz + data) | |
| class UDPRequestHandler(BaseRequestHandler): | |
| def get_data(self): | |
| return self.request[0].strip() | |
| def send_data(self, data): | |
| return self.request[1].sendto(data, self.client_address) | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Start a DNS implemented in Python.') | |
| parser = argparse.ArgumentParser(description='Start a DNS implemented in Python. Usually DNSs use UDP on port 53.') | |
| parser.add_argument('--port', default=5053, type=int, help='The port to listen on.') | |
| parser.add_argument('--tcp', action='store_true', help='Listen to TCP connections.') | |
| parser.add_argument('--udp', action='store_true', help='Listen to UDP datagrams.') | |
| args = parser.parse_args() | |
| if not (args.udp or args.tcp): parser.error("Please select at least one of --udp or --tcp.") | |
| print("Starting nameserver...") | |
| servers = [] | |
| if args.udp: servers.append(socketserver.ThreadingUDPServer(('', args.port), UDPRequestHandler)) | |
| if args.tcp: servers.append(socketserver.ThreadingTCPServer(('', args.port), TCPRequestHandler)) | |
| for s in servers: | |
| thread = threading.Thread(target=s.serve_forever) # that thread will start one more thread for each request | |
| thread.daemon = True # exit the server thread when the main thread terminates | |
| thread.start() | |
| print("%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name)) | |
| try: | |
| while 1: | |
| time.sleep(1) | |
| sys.stderr.flush() | |
| sys.stdout.flush() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| for s in servers: | |
| s.shutdown() | |
| if __name__ == '__main__': | |
| main() |
The original code has a LICENSE
i had an error sometimes running the server
dnslib.buffer.BufferError: Not enough bytes [offset=13,remaining=15,requested=101]
and i solved it by changing
def get_data(self): return self.request[0].strip()to
def get_data(self): return self.request[0]
Thanks!
Traceback (most recent call last):
File "D:\source\python\other\temp\1.py", line 69, in <module>
D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records,
File "C:\Users\User\AppData\Roaming\Python\Python37\site-packages\dnslib\dns.py", line 1150, in __init__
self.data = tuple(map(int,data.rstrip(".").split(".")))
ValueError: invalid literal for int() with base 10: 'tcp'
Would you please attach a license to this? I would like to use this as a base for my own DNS server, but I don't want to do it without a license.
@kj7rrv thanks for your comment. andreif's original gist, has been re-licensed under the Apache license, so I updated this code here, too.
Alternatives:
What about checking out @samuelcolvin's comment including a hint to his software https://github.com/samuelcolvin/dnserver. This is licensed under the MIT license and loosely based on the ideas here.
Nice man!!!
Can we make this support NAPTR query as well?
This code is great!!! Awesome!!!
i had an error sometimes running the server
and i solved it by changing
to