Last active
April 13, 2020 12:10
-
-
Save harrisont/146f6dcb1270a0f72adbb905d0ab7ee8 to your computer and use it in GitHub Desktop.
as_completed_with_max_concurrent: similar to asyncio.as_completed, but limits the concurrently running futures
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import itertools | |
def as_completed_with_max_concurrent(futures, max_concurrent, loop=None, timeout=None): | |
"""Tweaked version of `asyncio.as_completed` with the addition of the `max_concurrent` param. | |
The main change is to only queue (`_queue_future`) the first `max_concurrent` futures initially. | |
The rest will be queued in `_on_completion`. | |
""" | |
if isinstance(futures, asyncio.futures.Future) or asyncio.coroutines.iscoroutine(futures): | |
raise TypeError("expect a list of futures, not %s" % type(futures).__name__) | |
loop = loop if loop is not None else asyncio.events.get_event_loop() | |
todo = set() | |
done = asyncio.Queue(loop=loop) | |
timeout_handle = None | |
def _on_timeout(): | |
for f in todo: | |
f.remove_done_callback(_on_completion) | |
done.put_nowait(None) # Queue a dummy value for _wait_for_one(). | |
todo.clear() # Can't do todo.remove(f) in the loop. | |
def _on_completion(f): | |
if not todo: | |
return # _on_timeout() was here first. | |
todo.remove(f) | |
try: | |
future_to_queue = next(futures_to_queue_iter) | |
_queue_future(future_to_queue) | |
except StopIteration: | |
# Finished adding futures to todo | |
pass | |
done.put_nowait(f) | |
if not todo and timeout_handle is not None: | |
timeout_handle.cancel() | |
def _queue_future(f): | |
wrapped = asyncio.ensure_future(f, loop=loop) | |
wrapped.add_done_callback(_on_completion) | |
todo.add(wrapped) | |
@asyncio.coroutine | |
def _wait_for_one(): | |
f = yield from done.get() | |
if f is None: | |
# Dummy value from _on_timeout(). | |
raise asyncio.futures.TimeoutError | |
return f.result() # May raise f.exception(). | |
# Only queue the first `max_concurrent` futures initially. | |
# The rest will be queued in `_on_completion`. | |
futures_set = set(futures) | |
futures_to_queue_iter = futures_set.__iter__() | |
for future in itertools.islice(futures_to_queue_iter, 0, max_concurrent): | |
_queue_future(future) | |
if todo and timeout is not None: | |
timeout_handle = loop.call_later(timeout, _on_timeout) | |
for _ in range(len(futures_set)): | |
yield _wait_for_one() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from contextlib import closing | |
import random | |
import asyncio_helper | |
async def async_index_printer(index: int): | |
print('start', index) | |
await asyncio.sleep(random.uniform(1, 3)) | |
return index | |
async def async_index_printer_with_finisher(): | |
futures = map(async_index_printer, range(5)) | |
# Use as_completed_with_max_concurrent instead of asyncio.as_completed. | |
for completed_future in asyncio_helper.as_completed_with_max_concurrent(futures, max_concurrent=2): | |
result = await completed_future | |
print('end', result) | |
def main(): | |
with closing(asyncio.get_event_loop()) as loop: | |
loop.set_debug(True) | |
future = async_index_printer_with_finisher() | |
loop.run_until_complete(future) | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from contextlib import closing | |
import random | |
async def async_index_printer(index: int): | |
print('start', index) | |
await asyncio.sleep(random.uniform(1, 3)) | |
return index | |
async def do_with_semaphore(semaphore: asyncio.Semaphore, future): | |
async with semaphore: | |
return await future | |
async def async_index_printer_with_finisher(): | |
semaphore = asyncio.Semaphore(2) | |
futures = [do_with_semaphore(semaphore, async_index_printer(i)) for i in range(5)] | |
for completed_future in asyncio.as_completed(futures): | |
result = await completed_future | |
print('end', result) | |
def main(): | |
with closing(asyncio.get_event_loop()) as loop: | |
loop.set_debug(True) | |
future = async_index_printer_with_finisher() | |
loop.run_until_complete(future) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment