Created
April 10, 2020 13:17
-
-
Save romuald/d32c40c5bcf955ef3efa0e2a53ffda46 to your computer and use it in GitHub Desktop.
aioredis multiple subscription reader workaround
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ChannelReader: | |
""" | |
Wrapper against aioredis.Channel used to allow multiple readers | |
on the same subscription | |
Example:: | |
channels = await redis_pool.subscribe('subscribeme') | |
reader = ChannelReader(channels[0]) | |
for message in reader: | |
print(message) | |
""" | |
loops = weakref.WeakKeyDictionary() | |
# time to wait before failing when readers get stuck | |
ack_wait_time = 1.0 | |
# subscribers | |
def __init__(self, channel): | |
self.channel = channel | |
# if channel was not previously seen, create a new reader task | |
if channel not in self.loops: | |
weakset = weakref.WeakSet() | |
self.loops[channel] = weakset | |
asyncio.ensure_future(self._main_loop()) | |
self.subscribers = self.loops[channel] | |
async def __aiter__(self): | |
""" | |
Iterate over subscription messages | |
""" | |
future = asyncio.Future() | |
self.subscribers.add(future) | |
while True: | |
value = await future | |
# NOTICE: previous future object is removed from the weakset | |
# when this new reference is created | |
future = asyncio.Future() | |
self.subscribers.add(future) | |
yield value | |
async def _main_loop(self): | |
""" | |
Task reading the actual channel messages and re-dispatching them | |
to the different readers | |
""" | |
time = asyncio.get_running_loop().time | |
async for message in self.channel.iter(): | |
for subscriber in list(self.subscribers): | |
subscriber.set_result(message) | |
# Deleting reference to the last weakset entry | |
# so it is correctly removed from the set when consumed | |
del subscriber | |
start = time() | |
while True: | |
# Wait for all done tasks to be acknowledged | |
if not any(fut.done() for fut in self.subscribers): | |
break | |
await asyncio.sleep(0) | |
# … but don't wait too long (should not happen ©) | |
if start + 1 < self.ack_wait_time: | |
raise RuntimeError('Task result not consumed') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment