Skip to content

Instantly share code, notes, and snippets.

@chris3k
Last active October 19, 2016 23:17
Show Gist options
  • Save chris3k/001d7acc98917af37ce29b8df8c9c97c to your computer and use it in GitHub Desktop.
Save chris3k/001d7acc98917af37ce29b8df8c9c97c to your computer and use it in GitHub Desktop.
Send UDP packet over linux raw socket
#!/usr/bin/env python2.7
import socket
import struct
import random
def int_to_ip(i):
return ".".join(map(lambda x: str(x & 0xff), (i >> 24, i >> 16, i >> 8, i)))
def ip_to_int(i):
x = map(lambda k: int(k) & 0xff, i.split("."))
return x[0] << 24 | x[1] << 16 | x[2] << 8 | x[3]
def decode_ipv4_header(d):
ver_ihl, _, tot_len, _, _, ttl, proto, chksum, src_ip, dst_ip = struct.unpack(">BBHHHBBHII", d[:20])
return ver_ihl, tot_len, ttl, proto, chksum, int_to_ip(src_ip), int_to_ip(dst_ip)
def encode_ipv4_header(ttl, proto, src_ip, dst_ip, ip_payload_len=0):
# IPv4 options are not supported.
s = struct.pack(">BBHHHBBHII", 0x45, 0, 20 + ip_payload_len, random.randint(1, 0xfffe), 0, ttl, proto, 0,
ip_to_int(src_ip), ip_to_int(dst_ip))
return struct.pack(">BBHHHBBHII", 0x45, 0, 20 + ip_payload_len, 0, 0, ttl, proto, calc_checksum(s[:20]),
ip_to_int(src_ip), ip_to_int(dst_ip))
def decode_udp_header(d):
src_port, dest_port, pkt_len, chksum = struct.unpack(">HHHH", d[:8])
payload, = struct.unpack("{}s".format(len(d[8:])), d[8:])
return src_port, dest_port, pkt_len, chksum, payload
def encode_udp_header(src_port, dest_port, payload=""):
s = struct.pack(">HHHH", src_port, dest_port, 8 + len(payload), 0)
if payload:
s += payload
return s
def encode_udp_pseudo_header(src_addr, dest_addr, udp_len):
return struct.pack(">IIBBH", ip_to_int(src_addr), ip_to_int(dest_addr), 0, 17, udp_len)
def update_udp_checksum(d, chksum):
p = bytearray(d)
struct.pack_into(">H", p, 6, chksum)
return p
def calc_checksum(d):
"""
calc_checksum(bytearray("45000073000040004011b861c0a80001c0a800c7".decode("hex")))
>>> 0
calc_checksum(bytearray("c0a8000f640efa5700110044dd83c16c0044dadd6000000000140680200100009d386ab81048227ca6b9e327200100009d386ab8209c3e939bf105a8dd54b599a5cca0ba54c08034501000c1f3560000".decode("hex")))
>>> 0
"""
p = bytearray(d + "\x00") if len(d) % 2 else bytearray(d) # pad to even length
a = 0
b = 0
for i in xrange(0, len(p) - 1, 2):
a += p[i]
b += p[i + 1]
c = (a << 8) + b
while c >> 16:
c = (c & 0xffff) + (c >> 16)
return (-c - 1) & 0xffff # ~c <=> -c - 1
assert int_to_ip(ip_to_int("192.168.0.111")) == "192.168.0.111"
source_ip = "192.168.0.15"
dest_ip = "192.168.0.17"
udp_h = encode_udp_header(1205, 4501, "Heartbeat from the other side.")
print "udp header:", udp_h.encode("hex")
print decode_udp_header(udp_h)
udp_ph = encode_udp_pseudo_header(source_ip, dest_ip, len(udp_h))
print "udp_ph:", (udp_ph + udp_h).encode("hex")
udp_packet = update_udp_checksum(udp_h, calc_checksum(udp_ph + udp_h))
print "udp w/ chksum:", str(udp_packet).encode("hex")
print "Packet checksum validation:", hex(calc_checksum(udp_ph + udp_packet))
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
r = s.sendto(udp_packet, (dest_ip, 0))
print "UDP: Sent {} bytes".format(r)
ipv4_h = encode_ipv4_header(6, 17, source_ip, dest_ip, len(udp_h))
print "IPv4 header:", ipv4_h.encode("hex")
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
r = s.sendto(ipv4_h + udp_h, (dest_ip, 0))
print "IP/UDP: Sent {} bytes".format(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment