Skip to content

Instantly share code, notes, and snippets.

@abhinavsingh
Created August 29, 2013 13:29

Revisions

  1. abhinavsingh created this gist Aug 29, 2013.
    26 changes: 26 additions & 0 deletions forwarder.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    import zmq

    def main():
    try:
    context = zmq.Context(1)

    frontend = context.socket(zmq.SUB)
    frontend.bind('tcp://*:5559')
    frontend.setsockopt(zmq.SUBSCRIBE, '')

    backend = context.socket(zmq.PUB)
    backend.bind('tcp://*:5560')

    print 'starting zmq forwarder'
    zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
    pass
    except Exception as e:
    logger.exception(e)
    finally:
    frontend.close()
    backend.close()
    context.term()

    if __name__ == '__main__':
    main()
    8 changes: 8 additions & 0 deletions publish.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,8 @@
    import zmq

    if __name__ == '__main__':
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5559')
    socket.send('session_id helloworld')
    print 'sent data for channel session_id'
    50 changes: 50 additions & 0 deletions ws.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    import zmq
    from zmq.eventloop import ioloop
    from zmq.eventloop.zmqstream import ZMQStream
    ioloop.install()

    from tornado.websocket import WebSocketHandler
    from tornado.web import Application
    from tornado.ioloop import IOLoop
    ioloop = IOLoop.instance()

    class ZMQPubSub(object):

    def __init__(self, callback):
    self.callback = callback

    def connect(self):
    self.context = zmq.Context()
    self.socket = self.context.socket(zmq.SUB)
    self.socket.connect('tcp://127.0.0.1:5560')
    self.stream = ZMQStream(self.socket)
    self.stream.on_recv(self.callback)

    def subscribe(self, channel_id):
    self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

    class MyWebSocket(WebSocketHandler):

    def open(self):
    self.pubsub = ZMQPubSub(self.on_data)
    self.pubsub.connect()
    self.pubsub.subscribe("session_id")
    print 'ws opened'

    def on_message(self, message):
    print message

    def on_close(self):
    print 'ws closed'

    def on_data(self, data):
    print data

    def main():
    application = Application([(r'/channel', MyWebSocket)])
    application.listen(10001)
    print 'starting ws on port 10001'
    ioloop.start()

    if __name__ == '__main__':
    main()