Skip to content

Instantly share code, notes, and snippets.

@feythin
Forked from felipecruz/asyncsrv.py
Created March 25, 2013 14:21

Revisions

  1. Felipe cruz renamed this gist Mar 23, 2011. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. Felipe cruz created this gist Mar 23, 2011.
    109 changes: 109 additions & 0 deletions asyncsrv
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,109 @@
    import zmq
    import threading
    import time
    from random import choice

    class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self):
    threading.Thread.__init__ (self)

    def run(self):
    context = zmq.Context()
    socket = context.socket(zmq.XREQ)
    identity = 'worker-%d' % (choice([0,1,2,3,4,5,6,7,8,9]))
    socket.setsockopt(zmq.IDENTITY, identity )
    socket.connect('tcp://localhost:5570')
    print 'Client %s started' % (identity)
    poll = zmq.Poller()
    poll.register(socket, zmq.POLLIN)
    reqs = 0
    while True:
    for i in xrange(5):
    sockets = dict(poll.poll(1000))
    if socket in sockets:
    if sockets[socket] == zmq.POLLIN:
    msg = socket.recv()
    print '%s: %s\n' % (identity, msg)
    del msg
    reqs = reqs + 1
    print 'Req #%d sent..' % (reqs)
    socket.send('request #%d' % (reqs))

    socket.close()
    context.term()

    class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self):
    threading.Thread.__init__ (self)

    def run(self):
    context = zmq.Context()
    frontend = context.socket(zmq.XREP)
    frontend.bind('tcp://*:5570')

    backend = context.socket(zmq.XREQ)
    backend.bind('inproc://backend')

    workers = []
    for i in xrange(5):
    worker = ServerWorker(context)
    worker.start()
    workers.append(worker)

    poll = zmq.Poller()
    poll.register(frontend, zmq.POLLIN)
    poll.register(backend, zmq.POLLIN)

    while True:
    sockets = dict(poll.poll())
    if frontend in sockets:
    if sockets[frontend] == zmq.POLLIN:
    msg = frontend.recv()
    print 'Server received %s' % (msg)
    backend.send(msg)
    if backend in sockets:
    if sockets[backend] == zmq.POLLIN:
    msg = backend.recv()
    frontend.send(msg)

    frontend.close()
    backend.close()
    context.term()

    class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context):
    threading.Thread.__init__ (self)
    self.context = context

    def run(self):
    worker = self.context.socket(zmq.XREQ)
    worker.connect('inproc://backend')
    print 'Worker started'
    while True:
    msg = worker.recv()
    print 'Worker received %s' % (msg)
    replies = choice(xrange(5))
    for i in xrange(replies):
    time.sleep(1/choice(range(1,10)))
    worker.send(msg)
    del msg

    worker.close()


    def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in xrange(3):
    client = ClientTask()
    client.start()

    server.join()


    if __name__ == "__main__":
    main()