Created
February 25, 2018 13:13
-
-
Save jgrant41475/8f9f5c6101d5700a9dc29b3cb0fb8466 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import pyautogui as pag | |
__author__ = 'johng' | |
pag.FAILSAFE = False | |
def getIPAddr(): | |
"""Creates a UDP Socket to the Google DNS in order to retrieve the host ip address""" | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.connect(('8.8.8.8', 0)) | |
ip = s.getsockname()[0] | |
s.close() | |
return ip | |
class MouseServer: | |
"""Creates a TCP socket listener and handles client requests""" | |
# Variables for handling errors and communicating to/from clients | |
sock_error = False | |
input_error = False | |
ERROR_BIND = 1 | |
ERROR_CON_DIED = 2 | |
ERROR_CON_LOST = 3 | |
ERROR_BAD_COMMAND = 4 | |
ERROR_BAD_ARGS = 5 | |
RESPONSE_START = b'110' | |
RESPONSE_GOOD = b'0' | |
RESPONSE_BAD = b'1' | |
RESPONSE_CLOSE = b'2' | |
def __init__(self, host=getIPAddr(), port=5055, mouse_speed=0.1): | |
"""Entry point of MouseServer, initializes the socket listener and contains the main loop""" | |
self.MOUSE_MOVE_SPEED = mouse_speed | |
self.sock = socket.socket() | |
# Attempts to bind socket | |
try: | |
self.sock.bind((host, port)) | |
except OSError: | |
self.sock_error = self.ERROR_BIND | |
return | |
# Listens for one connection attempt at a time | |
self.sock.listen(1) | |
print("Listening on interface: " + host + ":" + str(port)) | |
# Continuously listen for connection attempts, and calls _on_connect() when one is made | |
while True: | |
conn, addr = self.sock.accept() | |
print("Connection with " + addr[0] + ":" + str(addr[1]) + " established") | |
self._on_connect(conn, addr) | |
def _on_connect(self, conn, remote): | |
"""Connection with client has been established""" | |
# Initialize connection error value and send a connection start response | |
conn_error = False | |
conn.send(self.RESPONSE_START) | |
while True: | |
self.input_error = False | |
msg_queue = "" | |
# Waits for client to send something... | |
try: | |
data = conn.recv(1024) | |
except socket.error: | |
conn_error = self.ERROR_CON_DIED | |
break | |
# No data received, terminate connection | |
if not data: | |
conn_error = self.ERROR_CON_LOST | |
break | |
# Splits data by space character. command = data[0], args = data[1:] | |
data = str(data, "utf-8").strip().split(" ") | |
command = data[0] | |
if len(data) == 2: | |
args = data[1] | |
else: | |
args = None | |
if command == "TEST": | |
print("Hello World!") | |
elif command == "CLOSE": | |
print("Client terminated connection.") | |
conn.send(self.RESPONSE_CLOSE) | |
conn.close() | |
break | |
# Invalid command | |
else: | |
self.input_error = self.ERROR_BAD_COMMAND | |
# Responds to client request with a 0 for OKAY and 1 for ERROR | |
if self.input_error is not False: | |
conn.send(self.RESPONSE_BAD + bytes(self._get_error(self.input_error), "UTF-8")) | |
else: | |
conn.send(self.RESPONSE_GOOD + bytes(msg_queue, "UTF-8")) | |
# Connection ended, if there was an error report it | |
if conn_error is not False: | |
print("Input error: " + self._get_error(conn_error)) | |
def _set_input_error(self, errno): | |
self.input_error = errno | |
@staticmethod | |
def parse_coords(coords): | |
"""Given a string in format of XXX,YYY returns a tuple of X and Y float coordinates, | |
or (None, None) if not valid coordinates""" | |
try: | |
x, y = coords.split(",") | |
return float(x), float(y) | |
except ValueError: | |
return None, None | |
def _get_error(self, errno): | |
"""Returns the appropriate error message for the given error number""" | |
error_messages = { | |
self.ERROR_BIND: "Failed to bind to the socket address.", | |
self.ERROR_CON_DIED: "Connection terminated by client.", | |
self.ERROR_CON_LOST: "Lost connection to the client.", | |
self.ERROR_BAD_COMMAND: "No such command.", | |
self.ERROR_BAD_ARGS: "Malformed arguments received, please see HELP." | |
} | |
return error_messages[errno] | |
def __del__(self): | |
"""Server termination house-keeping""" | |
if self.sock_error: | |
print("Exiting with error: " + self._get_error(self.sock_error)) | |
else: | |
try: | |
self.sock.send(self.RESPONSE_CLOSE) | |
except OSError: | |
print("Failed to send.") | |
pass | |
self.sock.close() | |
if __name__ == "__main__": | |
MouseServer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment