Last active
October 23, 2015 14:37
-
-
Save jam182/45e0825f8bbe1874a3df to your computer and use it in GitHub Desktop.
Echo server that uses only sockets and selects. No threads, asyncio etc.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Daniele Giancola 2015 | |
This program is implemented in python3.5. | |
The idea behind this server is to use non-blocking sockets, | |
and to schedule callbacks with 'select' when the sockets are ready | |
to be read or written. | |
According to the documentation, DefaultSelector() is an alias | |
for the most efficient implementation (epoll, select, kqueue, iocp, etc.) | |
based on the current platform. | |
https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector | |
The default selector is used to register a mapping, objects to callbacks: | |
(file descriptor, event) -> (callback). | |
When one of the objects (file descriptor or object with fileno() method) | |
becomes ready for the event it was registered for, the 'select' method will | |
return it in a list of (key, events) tuples (one for each ready object). | |
For each of these object the callback can be retrieved and finally | |
be executed being sure it can properly use the sockets for the operations | |
it needs. | |
In order to keep the registry clean, when a connection is closed or is lost, | |
the socket will be unregistered. | |
In order to send complete lines (that end with \n), a buffer to store | |
incomplete lines is kept for each client. | |
This buffer is used to attach to its content the next chunk of data received | |
until there is a \n. Then the message can be finally sent. | |
For instance when the server is first initialized, the socket on which | |
it will listen to, for incoming connections, is set to non-blocking. | |
However it is only registered in the selector to wait until it is ready | |
to be read. Only when the select decides that the socket is ready, the event | |
is returned so the callback to accept an incoming connection is executed. | |
Basically the event loop returns the socket with the callback for each event, | |
and it executes the callback passing the socket as an argument. | |
Simple Usage: | |
Server: | |
$ python3.5 echo_server.py | |
Echo Server Started | |
Client: | |
$ ncat localhost 8888 | |
hello | |
hello | |
""" | |
import selectors | |
import socket | |
from functools import partial | |
from collections import deque | |
class EchoServer: | |
"""Asynchronous server that echos every message it receives. | |
It echos line by line. | |
It works without threads, only sockets and selects. | |
It can concurrently accept and receive messages from multiple clients. | |
Examples - Start the server and connect with multiple clients: | |
Server: | |
>>> server = EchoServer() | |
>>> server | |
EchoServer(host='localhost', port=8888) | |
>>> server.run() | |
Echo Server Started | |
Client: | |
$ ncat localhost 8888 | |
hello | |
hello | |
""" | |
def __init__(self, host='localhost', port=8888, backlog=5): | |
"""Start up the server on the specified address:port. | |
Get the most efficient implementation for the current platform of | |
the selector. | |
Set the socket to non blocking mode and register it | |
to the select system-call for the EVENT_READ event, so to be notified | |
when the socket is ready to be read and the server | |
can accept connections""" | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.sock.setblocking(False) | |
self.sock.bind((host, port)) | |
self.sock.listen(backlog) | |
self.sel = selectors.DefaultSelector() | |
self.sel.register(self.sock, selectors.EVENT_READ, self._accept) | |
self.clients_buffered_line = dict() | |
self.clients_echos = dict() | |
def __repr__(self): | |
host, port = self.sock.getsockname() | |
cls = self.__class__.__name__ | |
return '{0}(host={1}, port={2})'.format(cls, host, port) | |
def run(self): | |
"""Simple event loop that calls all the callbacks | |
for those sockets who registered to require some action | |
on a particular event happening.""" | |
while True: | |
events = self.sel.select() | |
for key, mask in events: | |
callback = key.data | |
callback(key.fileobj) | |
def _accept(self, sock): | |
"""When a socket is ready to accept a new connection, | |
this callback is executed. For each client a buffer to receive | |
incomplete lines is initialized. Finally register the socket | |
to be called back whenever it is ready to receive with the _read()""" | |
conn, addr = sock.accept() | |
conn.setblocking(False) | |
self.clients_buffered_line[conn] = '' | |
self.sel.register(conn, selectors.EVENT_READ, self._read) | |
def _read(self, sock, size=4096): | |
"""When the socket is ready to receive, get data as long as | |
there is any and make sure to echo every line back to the client.""" | |
try: | |
data = sock.recv(size) | |
if data: | |
self._echo(sock, data.decode()) | |
else: | |
self._unregister_client(sock) | |
except ConnectionResetError: # Unexpected connection loss | |
self._unregister_client(sock) | |
def _echo(self, sock, data): | |
"""This function makes sure to compose a complete line | |
for each client and sends it back to it.""" | |
lines = data.split('\n') | |
lines[0] = ''.join([self.clients_buffered_line[sock], lines[0]]) | |
for line in lines[:-1]: | |
msg = ''.join([line, '\n']).encode() | |
self._add_writer_callback(msg, sock) | |
if len(lines) > 1: | |
self.sel.modify(sock, selectors.EVENT_WRITE, self._schedule_send) | |
self.clients_buffered_line[sock] = lines.pop() | |
def _add_writer_callback(self, msg, sock): | |
"""Create a callback to send a line and add it | |
to the queue""" | |
callback = partial(self._send_line, data=msg) | |
if sock not in self.clients_echos: | |
self.clients_echos[sock] = deque() | |
self.clients_echos[sock].append(callback) | |
def _send_line(self, sock, data): | |
"""Send a full line. If the client cannot receive the | |
whole packet, schedule the remaining data to be sent for | |
when the client socket is ready to be written on again. | |
When done start listening again for new data to echo.""" | |
data_sent = sock.send(data) | |
if data_sent < len(data): | |
callback = partial(self._send_line, data=data[data_sent:]) | |
self.sel.modify(sock, selectors.EVENT_WRITE, callback) | |
else: | |
self.sel.modify(sock, selectors.EVENT_READ, self._read) | |
def _schedule_send(self, sock): | |
"""Whenever the socket is ready for write operations, | |
call send. Register for writing event if there is more than | |
one line to be sent""" | |
callback = self.clients_echos[sock].popleft() | |
try: | |
callback(sock) | |
if self.clients_echos[sock]: | |
self.sel.modify(sock, selectors.EVENT_WRITE, | |
self._schedule_send) | |
else: | |
del self.clients_echos[sock] | |
except (BlockingIOError, ConnectionResetError): | |
self._unregister_client(sock) | |
def _unregister_client(self, sock): | |
"""When a client is done sending data, the server can clean up by | |
removing its buffer and unregistering its socket from the | |
select""" | |
if sock in self.clients_buffered_line: | |
del self.clients_buffered_line[sock] | |
if sock in self.clients_echos: | |
del self.clients_echos[sock] | |
try: | |
self.sel.unregister(sock) | |
except ValueError: | |
pass # File Descriptor -1. Error, close connection. | |
finally: | |
sock.close() | |
def close(self): | |
"""Close nicely when the server exits""" | |
clients = self.clients_buffered_line.copy() | |
for client in clients: | |
self._unregister_client(client) | |
self.sel.close() | |
self.sock.close() | |
def start_sever(): | |
"""Start the server from the command line""" | |
server = EchoServer() | |
try: | |
print('Echo Server Started') | |
server.run() | |
except KeyboardInterrupt: | |
print('Server Closing') | |
finally: | |
server.close() | |
if __name__ == '__main__': | |
start_sever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment