-
-
Save miadz/9925a79fc766203395e6816e88ed6dbd to your computer and use it in GitHub Desktop.
PyZMQ Threaded Load-Balancing Broker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Threaded Load-balancing Broker | |
""" | |
from __future__ import print_function | |
from threading import Thread | |
import zmq | |
NBR_CLIENTS = 10 | |
NBR_WORKERS = 3 | |
BACKEND_ADDRESS = "tcp://127.0.0.1:8001" | |
# BACKEND_ADDRESS = "inproc://backend" | |
def client_task(ident): | |
"""Basic request-reply client using REQ socket.""" | |
socket = zmq.Context().socket(zmq.REQ) | |
socket.identity = u"Client-{}".format(ident).encode("ascii") | |
socket.connect("tcp://127.0.0.1:8000") | |
# Send request, get reply | |
socket.send(b"HELLO") | |
reply = socket.recv() | |
print("{}: {}".format(socket.identity.decode("ascii"), | |
reply.decode("ascii"))) | |
def worker_task(ident): | |
"""Worker task, using a REQ socket to do load-balancing.""" | |
socket = zmq.Context().socket(zmq.REQ) | |
socket.identity = u"Worker-{}".format(ident).encode("ascii") | |
socket.connect(BACKEND_ADDRESS) | |
# Tell broker we're ready for work | |
socket.send(b"READY") | |
while True: | |
address, empty, request = socket.recv_multipart() | |
print("{}: {}".format(socket.identity.decode("ascii"), | |
request.decode("ascii"))) | |
socket.send_multipart([address, b"", b"OK"]) | |
def main(): | |
"""Load balancer main loop.""" | |
# Prepare context and sockets | |
context = zmq.Context.instance() | |
frontend = context.socket(zmq.ROUTER) | |
frontend.bind("tcp://127.0.0.1:8000") | |
backend = context.socket(zmq.ROUTER) | |
backend.bind(BACKEND_ADDRESS) | |
# Start background tasks | |
def start(task, *args): | |
worker = Thread(target=task, args=args) | |
worker.daemon = True | |
worker.start() | |
for i in range(NBR_CLIENTS): | |
start(client_task, i) | |
for i in range(NBR_WORKERS): | |
start(worker_task, i) | |
# Initialize main loop state | |
count = NBR_CLIENTS | |
workers = [] | |
poller = zmq.Poller() | |
# Only poll for requests from backend until workers are available | |
poller.register(backend, zmq.POLLIN) | |
while True: | |
sockets = dict(poller.poll()) | |
if backend in sockets: | |
# Handle worker activity on the backend | |
request = backend.recv_multipart() | |
worker, empty, client = request[:3] | |
if not workers: | |
# Poll for clients now that a worker is available | |
poller.register(frontend, zmq.POLLIN) | |
workers.append(worker) | |
if client != b"READY" and len(request) > 3: | |
# If client reply, send rest back to frontend | |
empty, reply = request[3:] | |
frontend.send_multipart([client, b"", reply]) | |
count -= 1 | |
if not count: | |
break | |
if frontend in sockets: | |
# Get next client request, route to last-used worker | |
client, empty, request = frontend.recv_multipart() | |
worker = workers.pop(0) | |
backend.send_multipart([worker, b"", client, b"", request]) | |
if not workers: | |
# Don't poll clients if no workers are available | |
poller.unregister(frontend) | |
# Clean up | |
backend.close() | |
frontend.close() | |
context.term() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment