-
-
Save mrpapercut/92422ecf06b5ab8e64e502da5e33b9f7 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python3 | |
| import binascii | |
| import socket | |
| import sys | |
| from collections import OrderedDict | |
| # See https://web.archive.org/web/20180919041301/https://routley.io/tech/2017/12/28/hand-writing-dns-messages.html | |
| # See https://tools.ietf.org/html/rfc1035 | |
| def send_udp_message(message, address, port): | |
| """send_udp_message sends a message to UDP server | |
| message should be a hexadecimal encoded string | |
| """ | |
| message = message.replace(" ", "").replace("\n", "") | |
| server_address = (address, port) | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| try: | |
| sock.sendto(binascii.unhexlify(message), server_address) | |
| data, _ = sock.recvfrom(4096) | |
| finally: | |
| sock.close() | |
| return binascii.hexlify(data).decode("utf-8") | |
| def build_message(type="A", address=""): | |
| ID = 43690 # 16-bit identifier (0-65535) # 43690 equals 'aaaa' | |
| QR = 0 # Query: 0, Response: 1 1bit | |
| OPCODE = 0 # Standard query 4bit | |
| AA = 0 # ? 1bit | |
| TC = 0 # Message is truncated? 1bit | |
| RD = 1 # Recursion? 1bit | |
| RA = 0 # ? 1bit | |
| Z = 0 # ? 3bit | |
| RCODE = 0 # ? 4bit | |
| query_params = str(QR) | |
| query_params += str(OPCODE).zfill(4) | |
| query_params += str(AA) + str(TC) + str(RD) + str(RA) | |
| query_params += str(Z).zfill(3) | |
| query_params += str(RCODE).zfill(4) | |
| query_params = "{:04x}".format(int(query_params, 2)) | |
| QDCOUNT = 1 # Number of questions 4bit | |
| ANCOUNT = 0 # Number of answers 4bit | |
| NSCOUNT = 0 # Number of authority records 4bit | |
| ARCOUNT = 0 # Number of additional records 4bit | |
| message = "" | |
| message += "{:04x}".format(ID) | |
| message += query_params | |
| message += "{:04x}".format(QDCOUNT) | |
| message += "{:04x}".format(ANCOUNT) | |
| message += "{:04x}".format(NSCOUNT) | |
| message += "{:04x}".format(ARCOUNT) | |
| # QNAME is url split up by '.', preceded by int indicating length of part | |
| addr_parts = address.split(".") | |
| for part in addr_parts: | |
| addr_len = "{:02x}".format(len(part)) | |
| addr_part = binascii.hexlify(part.encode()) | |
| message += addr_len | |
| message += addr_part.decode() | |
| message += "00" # Terminating bit for QNAME | |
| # Type of request | |
| QTYPE = get_type(type) | |
| message += QTYPE | |
| # Class for lookup. 1 is Internet | |
| QCLASS = 1 | |
| message += "{:04x}".format(QCLASS) | |
| return message | |
| def decode_message(message): | |
| res = [] | |
| ID = message[0:4] | |
| query_params = message[4:8] | |
| QDCOUNT = message[8:12] | |
| ANCOUNT = message[12:16] | |
| NSCOUNT = message[16:20] | |
| ARCOUNT = message[20:24] | |
| params = "{:b}".format(int(query_params, 16)).zfill(16) | |
| QPARAMS = OrderedDict([ | |
| ("QR", params[0:1]), | |
| ("OPCODE", params[1:5]), | |
| ("AA", params[5:6]), | |
| ("TC", params[6:7]), | |
| ("RD", params[7:8]), | |
| ("RA", params[8:9]), | |
| ("Z", params[9:12]), | |
| ("RCODE", params[12:16]) | |
| ]) | |
| # Question section | |
| QUESTION_SECTION_STARTS = 24 | |
| question_parts = parse_parts(message, QUESTION_SECTION_STARTS, []) | |
| QNAME = ".".join(map(lambda p: binascii.unhexlify(p).decode(), question_parts)) | |
| QTYPE_STARTS = QUESTION_SECTION_STARTS + (len("".join(question_parts))) + (len(question_parts) * 2) + 2 | |
| QCLASS_STARTS = QTYPE_STARTS + 4 | |
| QTYPE = message[QTYPE_STARTS:QCLASS_STARTS] | |
| QCLASS = message[QCLASS_STARTS:QCLASS_STARTS + 4] | |
| res.append("\n# HEADER") | |
| res.append("ID: " + ID) | |
| res.append("QUERYPARAMS: ") | |
| for qp in QPARAMS: | |
| res.append(" - " + qp + ": " + QPARAMS[qp]) | |
| res.append("\n# QUESTION SECTION") | |
| res.append("QNAME: " + QNAME) | |
| res.append("QTYPE: " + QTYPE + " (\"" + get_type(int(QTYPE, 16)) + "\")") | |
| res.append("QCLASS: " + QCLASS) | |
| # Answer section | |
| ANSWER_SECTION_STARTS = QCLASS_STARTS + 4 | |
| NUM_ANSWERS = max([int(ANCOUNT, 16), int(NSCOUNT, 16), int(ARCOUNT, 16)]) | |
| if NUM_ANSWERS > 0: | |
| res.append("\n# ANSWER SECTION") | |
| for ANSWER_COUNT in range(NUM_ANSWERS): | |
| if (ANSWER_SECTION_STARTS < len(message)): | |
| ANAME = message[ANSWER_SECTION_STARTS:ANSWER_SECTION_STARTS + 4] # Refers to Question | |
| ATYPE = message[ANSWER_SECTION_STARTS + 4:ANSWER_SECTION_STARTS + 8] | |
| ACLASS = message[ANSWER_SECTION_STARTS + 8:ANSWER_SECTION_STARTS + 12] | |
| TTL = int(message[ANSWER_SECTION_STARTS + 12:ANSWER_SECTION_STARTS + 20], 16) | |
| RDLENGTH = int(message[ANSWER_SECTION_STARTS + 20:ANSWER_SECTION_STARTS + 24], 16) | |
| RDDATA = message[ANSWER_SECTION_STARTS + 24:ANSWER_SECTION_STARTS + 24 + (RDLENGTH * 2)] | |
| if ATYPE == get_type("A"): | |
| octets = [RDDATA[i:i+2] for i in range(0, len(RDDATA), 2)] | |
| RDDATA_decoded = ".".join(list(map(lambda x: str(int(x, 16)), octets))) | |
| else: | |
| RDDATA_decoded = ".".join(map(lambda p: binascii.unhexlify(p).decode('iso8859-1'), parse_parts(RDDATA, 0, []))) | |
| ANSWER_SECTION_STARTS = ANSWER_SECTION_STARTS + 24 + (RDLENGTH * 2) | |
| try: ATYPE | |
| except NameError: None | |
| else: | |
| res.append("# ANSWER " + str(ANSWER_COUNT + 1)) | |
| res.append("QDCOUNT: " + str(int(QDCOUNT, 16))) | |
| res.append("ANCOUNT: " + str(int(ANCOUNT, 16))) | |
| res.append("NSCOUNT: " + str(int(NSCOUNT, 16))) | |
| res.append("ARCOUNT: " + str(int(ARCOUNT, 16))) | |
| res.append("ANAME: " + ANAME) | |
| res.append("ATYPE: " + ATYPE + " (\"" + get_type(int(ATYPE, 16)) + "\")") | |
| res.append("ACLASS: " + ACLASS) | |
| res.append("\nTTL: " + str(TTL)) | |
| res.append("RDLENGTH: " + str(RDLENGTH)) | |
| res.append("RDDATA: " + RDDATA) | |
| res.append("RDDATA decoded (result): " + RDDATA_decoded + "\n") | |
| return "\n".join(res) | |
| def get_type(type): | |
| types = [ | |
| "ERROR", # type 0 does not exist | |
| "A", | |
| "NS", | |
| "MD", | |
| "MF", | |
| "CNAME", | |
| "SOA", | |
| "MB", | |
| "MG", | |
| "MR", | |
| "NULL", | |
| "WKS", | |
| "PTS", | |
| "HINFO", | |
| "MINFO", | |
| "MX", | |
| "TXT" | |
| ] | |
| return "{:04x}".format(types.index(type)) if isinstance(type, str) else types[type] | |
| def parse_parts(message, start, parts): | |
| part_start = start + 2 | |
| part_len = message[start:part_start] | |
| if len(part_len) == 0: | |
| return parts | |
| part_end = part_start + (int(part_len, 16) * 2) | |
| parts.append(message[part_start:part_end]) | |
| if message[part_end:part_end + 2] == "00" or part_end > len(message): | |
| return parts | |
| else: | |
| return parse_parts(message, part_end, parts) | |
| # Usage: python3 raw-dns-req.py github.com | |
| if len(sys.argv) > 1: | |
| url = sys.argv[1] | |
| else: | |
| url = "github.com" | |
| # See get_type function for other possibilities for first argument | |
| message = build_message("A", url) | |
| print("Request:\n" + message) | |
| print("\nRequest (decoded):" + decode_message(message)) | |
| # second argument is external DNS server, third argument is port | |
| response = send_udp_message(message, "1.1.1.1", 53) | |
| print("\nResponse:\n" + response) | |
| print("\nResponse (decoded):" + decode_message(response)) |
Please feel free to use the code, it's public for a reason :) I added a GNU Public License here: https://gist.github.com/mrpapercut/b73e8748c4cf22a541f442622f5672ff
Hi Ian, I've changed the license from GPL to WTFPL, meaning you can do whatever you want with the code :)
I've also updated the link to the original article that inspired this gist.
Hi, many thanks!
Ian
(my original comment seems to have gone - for the sake of others, I was asking about GPL V3 restrictions concerning publishing source code).
@mrpapercut hey there, I wanted to say thanks for the nice DNS code. It's very elegant and gives you exactly what you need. You've saved me a lot of work and I appreciate it. Good job!
hello i just wanted to say, thank you so much for this code, i'm not looking to use it anywhere, i was just curious about the workings of dns and looking at the code helps me so much more than any explanation.
Hi, can I use a subset of this code in my thesis? This is exactly what I was looking for. I will leave credits.
What license do you use? If none, can you add a license like GNU Public License 3.0 to this?
Great Regards. Oscar Andersson, Karlstad University.