Skip to content

Instantly share code, notes, and snippets.

@72squared
Created January 5, 2016 22:50
Show Gist options
  • Save 72squared/6e6f81f16caec186255f to your computer and use it in GitHub Desktop.
Save 72squared/6e6f81f16caec186255f to your computer and use it in GitHub Desktop.
import random
import sys
import ws
from connectors import redis_connection_settings
from txredis.client import RedisSubscriber
from autobahn.twisted.websocket import WebSocketServerFactory, listenWS
from twisted.python import log
from twisted.internet import reactor, protocol
# set up a stub singleton that starts out empty and only shows up once we are underway
# after init has been called.
_notifier = None
# when we can't connect to redis right away, we need to keep trying but
# don't want to spam the network. This sets how many seconds before another try.
REDIS_CONNECT_FAIL_RETRY_INTERVAL = 1
def random_redis_connection_addr():
"""
this gets a random redis host, port to connnect to from the startup nodes we
pass to the cluster. It doesn't do anything super smart like connect to the cluster
and find a list of masters, although we could do that in the future.
On production, we use short cnames that should always point to an active pair.
During planned migration, we keep the old masters around long after the cname changes
and part of the cluster as slaves.
doesn't matter which redis node we connect to, since all the nodes in the cluster relay all
pubsub messages to every node in the cluster over the cluster bus.
This may change in future implementations of redis cluster, something to watch for.
"""
settings = redis_connection_settings('mp')
if 'startup_nodes' in settings:
settings = random.choice(settings['startup_nodes'])
return settings['host'], settings['port']
class WatchChannels(RedisSubscriber):
"""
This class is an asnyc wrapper around ideally a single connection to redis. When websocket
clients connect to our server, we tell this single redis connection to start subscribing to that
user's channel in redis.
We watch for connection errors and disconnect errors and try to reconnect in a graceful way.
"""
def messageReceived(self, channel, message):
"""
when the pubsub notification comes in from redis we find the right clients to relay
that message to.
Since multiple clients for the same user id could be connected, we have to use a nested
list of clients where all the clients for a channel are the same user but different connections.
"""
n = notifier()
log.msg("sub: %s %s", channel, message)
try:
for client_id in n.channels[channel]:
cb = n.channels[channel][client_id]
try:
cb(message)
except Exception as e:
#del n.channels[channel][client_id]
log.msg("exception %s", e)
except KeyError:
pass
def connectionLost(self, reason):
"""
override the parent method so that if we get disconnected from the redis backend, we can
trigger a reconnection
"""
super(WatchChannels, self).connectionLost(reason)
log.msg("connection was lost")
notifier().init()
class Notifier(object):
"""
We keep track of all the connected clients that have successfully authed, and trigger
callbacks when published events come from redis for them.
Most of the logic in this class is crazy error handling for what to do when unexpected
async things happen.
"""
def __init__(self):
"""
we don't connect to redis right away. that happens when reactor.run is first called
set up the list of channels. this is a user related namespace that matches the keys
we publish to in redis.
"""
self.redis = None
self.channels = {}
def init(self):
"""
once our server is underway, try to connect to redis, or any time we have to reconnect.
This might happen at any point since redis nodes can and do die on us.
When we reconnect, we need to resubscribe to all the channels again.
Since this is an async program, we have to manage callbacks for success and error cases
on redis connection events.
"""
def got_conn(redis):
log.msg("connected to redis")
self.redis = redis
for channel in self.channels:
redis.subscribe(channel)
def cannot_conn(res):
log.msg("cannot connect to redis %s", res)
reactor.callLater(REDIS_CONNECT_FAIL_RETRY_INTERVAL, self.init)
c = protocol.ClientCreator(reactor, WatchChannels)
d = c.connectTCP(*random_redis_connection_addr())
d.addCallback(got_conn)
d.addErrback(cannot_conn)
return d
def subscribe(self, client):
"""
when a client successfully auths, we pass along the channel namespace to subscribe in redis.
trying to be careful about race conditions.
"""
if not self.redis:
return
self.channels.setdefault(client.channel, {})[client.client_id] = client.sendTextMessage
self.redis.subscribe(client.channel)
def unsubscribe(self, client):
"""
when a client disconnects for any reason, we need to remove any callbacks in the channels list for that
client, if no clients for that user are in the channel anymore, clean up the data structure
and unsubscribe from redis for that channel.
"""
try:
del self.channels[client.channel][client.client_id]
if not self.channels[client.channel]:
del self.channels[client.channel]
if self.redis:
self.redis.unsubscribe(client.channel)
except (KeyError, TypeError):
pass
# pylint: disable=too-many-ancestors
class NotificationProtocol(ws.AuthWebSocketServerProtocol):
def onConnect(self, request):
"""
during the auth step we use userinfo to create channel for listening in redis.
After auth (parent class), we subscribe to the notifier which sets up the subscribe call to redis and
sets up a callback event to relay all the redis data to the client.
"""
super(NotificationProtocol, self).onConnect(request)
notifier().subscribe(self)
def onClose(self, wasClean, code, reason):
"""
when the client disconnects for any reason, we need to let the notifier know
"""
notifier().unsubscribe(self)
super(NotificationProtocol, self).onClose(wasClean, code, reason)
@property
def channel(self):
return '#U{%s}' % self.userinfo['user_id']
def notifier():
"""
this feels hacky, its a singleton instance of the notifier that is used for our server.
doesn't do anything on instantiation, only goes into action once the reactor.run() step happens.
but we refer to it.
"""
global _notifier
if _notifier is None:
_notifier = Notifier()
return _notifier
def run(port=3333, debug=False):
"""
Kick off the server from a script somewhere.
This is really the only method that needs to be called externally.
:param port:
:param debug:
:return:
"""
if debug:
log.startLogging(sys.stdout)
# set up the web socket server
factory = WebSocketServerFactory("ws://0.0.0.0:%d" % port, debug=debug)
factory.protocol = NotificationProtocol
factory.setProtocolOptions(autoPingInterval=30, autoPingTimeout=10)
listenWS(factory)
# this step will cause us to set up a redis connection once the reactor loop has started
reactor.callWhenRunning(notifier().init)
# go time!!! this is the end of the program. When this loop exits, we're all done.
reactor.run()
@v6
Copy link

v6 commented Jan 12, 2016

// , I came across this on https://gitter.im/Grokzen/redis-py-cluster.

Forgive me if these are obvious, but beyond establishing a connection to the Redis cluster, what are some applications for this code?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment