Created
November 5, 2020 18:34
-
-
Save armandmcqueen/5c615a2ed5d9d5f121c22a1c5f09f4fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import urllib | |
import socket | |
import requests.adapters | |
import logging | |
import http.client | |
import argparse | |
""" | |
A very simple Python traceroute(8) implementation | |
""" | |
import socket | |
import random | |
__all__ = ['Tracer'] | |
class Tracer(object): | |
def __init__(self, dst, hops=30, port=None): | |
""" | |
Initializes a new tracer object | |
Args: | |
dst (str): Destination host to probe | |
hops (int): Max number of hops to probe | |
""" | |
self.dst = dst | |
self.hops = hops | |
self.ttl = 1 | |
# Pick up a random port in the range 33434-33534 | |
# self.port = random.choice(range(33434, 33535)) | |
if port is None: | |
self.port = random.choice(range(33434, 33535)) | |
else: | |
self.port = port | |
def run(self): | |
""" | |
Run the tracer | |
Raises: | |
IOError | |
""" | |
try: | |
dst_ip = socket.gethostbyname(self.dst) | |
except socket.error as e: | |
raise IOError('Unable to resolve {}: {}', self.dst, e) | |
text = 'traceroute to {} ({}), {} hops max'.format( | |
self.dst, | |
dst_ip, | |
self.hops | |
) | |
print(text) | |
while True: | |
receiver = self.create_receiver() | |
sender = self.create_sender() | |
sender.sendto(b'', (self.dst, self.port)) | |
addr = None | |
try: | |
data, addr = receiver.recvfrom(1024) | |
except socket.error: | |
raise IOError('Socket error: {}'.format(e)) | |
finally: | |
receiver.close() | |
sender.close() | |
if addr: | |
print('{:<4} {}'.format(self.ttl, addr[0])) | |
else: | |
print('{:<4} *'.format(self.ttl)) | |
self.ttl += 1 | |
if addr[0] == dst_ip or self.ttl > self.hops: | |
break | |
def create_receiver(self): | |
""" | |
Creates a receiver socket | |
Returns: | |
A socket instance | |
Raises: | |
IOError | |
""" | |
s = socket.socket( | |
family=socket.AF_INET, | |
type=socket.SOCK_RAW, | |
proto=socket.IPPROTO_ICMP | |
) | |
try: | |
s.bind(('', self.port)) | |
except socket.error as e: | |
raise IOError('Unable to bind receiver socket: {}'.format(e)) | |
return s | |
def create_sender(self): | |
""" | |
Creates a sender socket | |
Returns: | |
A socket instance | |
""" | |
s = socket.socket( | |
family=socket.AF_INET, | |
type=socket.SOCK_DGRAM, | |
proto=socket.IPPROTO_UDP | |
) | |
s.setsockopt(socket.SOL_IP, socket.IP_TTL, self.ttl) | |
return s | |
def main(args): | |
if args.action == "ping": | |
ping(args) | |
elif args.action == "trace": | |
trace(args) | |
def trace(args): | |
print("Tracing Route to", args.ip, "port:", args.port) | |
tracer = Tracer( | |
dst=args.ip, | |
hops=100, | |
port=args.port | |
) | |
tracer.run() | |
def ping(args): | |
if not args.endpoint.startswith("/"): | |
args.endpoint = '/'+args.endpoint | |
print("Sending and Inspecting Request") | |
print("requests version:", requests.__version__) | |
ip = args.ip + ":" + str(args.port) | |
url = "http://"+ip+args.endpoint | |
print("Sending request to", url) | |
logging.basicConfig(level=logging.DEBUG) | |
httpclient_logger = logging.getLogger("http.client") | |
def httpclient_logging_patch(level=logging.DEBUG): | |
"""Enable HTTPConnection debug logging to the logging framework""" | |
def httpclient_log(*args, **kwargs): | |
s = "|".join(args) | |
s += "|-|" + str(kwargs) | |
httpclient_logger.log(level, s) | |
# mask the print() built-in in the http.client module to use | |
# logging instead | |
http.client.print = httpclient_log | |
# enable debugging | |
http.client.HTTPConnection.debuglevel = 1 | |
httpclient_logging_patch() | |
print("Sending GET") | |
response = requests.get(url) | |
print("Got Response:", response.text) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"action", | |
choices=["ping", "trace"]) | |
parser.add_argument( | |
"--ip", | |
help="The IP to send a request to", | |
type=str, | |
required=True | |
) | |
parser.add_argument( | |
"--port", | |
help="The port to send the request to", | |
default=8080, | |
) | |
parser.add_argument( | |
"--endpoint", | |
help="The endpoint to ping. Must have a leading slash", | |
default="/ping", | |
) | |
args, leftovers = parser.parse_known_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment