Created
May 12, 2020 10:02
-
-
Save showa-yojyo/4ed200d4c41f496a45a7af2612912df3 to your computer and use it in GitHub Desktop.
Implement Producer/Consumer pattern with asyncio.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
A simple producer/consumer example, using Queue.task_done and Queue.join | |
From https://asyncio.readthedocs.io/en/latest/producer_consumer.html | |
""" | |
import asyncio | |
import random | |
async def produce(queue, n): | |
for x in range(1, n + 1): | |
# produce an item | |
print(f'producing {x}/{n}') | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
# put the item in the queue | |
await queue.put(x) | |
async def consume(queue): | |
while True: | |
# wait for an item from the producer | |
item = await queue.get() | |
# process the item | |
print(f'consuming {item}...') | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
# Notify the queue that the item has been processed | |
queue.task_done() | |
async def run(n): | |
queue = asyncio.Queue() | |
# schedule consumers | |
consumers = [] | |
for _ in range(3): | |
consumer = asyncio.create_task(consume(queue)) | |
consumers.append(consumer) | |
# run the producer and wait for completion | |
await produce(queue, n) | |
# wait until the consumer has processed all items | |
await queue.join() | |
# the consumers are still awaiting for an item, cancel them | |
for consumer in consumers: | |
consumer.cancel() | |
# wait until all worker tasks are cancelled | |
await asyncio.gather(*consumers, return_exceptions=True) | |
asyncio.run(run(10)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment