Skip to content

Instantly share code, notes, and snippets.

@martinkozle
Last active July 14, 2023 13:09
Show Gist options
  • Save martinkozle/68479ea35d4fade4a3d4f83da87f98d6 to your computer and use it in GitHub Desktop.
Save martinkozle/68479ea35d4fade4a3d4f83da87f98d6 to your computer and use it in GitHub Desktop.
Asynchronously iterate a synchronous IO blocking iterator using a thread pool
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import AsyncIterator, Iterator, TypeVar
T = TypeVar("T")
def checked_next(iterator: Iterator[T]) -> T | None:
try:
return next(iterator)
except StopIteration:
return None
async def sync_to_async_iter(iterator: Iterator[T]) -> AsyncIterator[T]:
with ThreadPoolExecutor(max_workers=1) as executor:
while (
value := await asyncio.get_running_loop().run_in_executor(
executor, partial(checked_next, iterator)
)
) is not None:
yield value
@martinkozle
Copy link
Author

The sync_to_async_iter function executes the next function on a synchronous iterator in a separate thread asynchronously so that it doesn't block the main program.
It is meant to be used on an iterator that does some long IO blocking operation on every next call.
In the checked_next function None is used as a sentinel value to signal when the iteration is over. This is done because StopIteration cannot be raised into a Future. This has the side effect of if your iterator actually yields None it will stop the asynchronous iteration early. If you need support for yielding None, something more robust like this can be used:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from enum import Enum, auto
from functools import partial
from typing import AsyncIterator, Iterator, Literal, TypeVar

T = TypeVar("T")


class StopIterationSentinelType(Enum):
    StopIterationSentinel = auto()


def checked_next(
    iterator: Iterator[T],
) -> T | Literal[StopIterationSentinelType.StopIterationSentinel]:
    try:
        return next(iterator)
    except StopIteration:
        return StopIterationSentinelType.StopIterationSentinel


async def sync_to_async_iter(iterator: Iterator[T]) -> AsyncIterator[T]:
    with ThreadPoolExecutor(max_workers=1) as executor:
        while (
            value := await asyncio.get_running_loop().run_in_executor(
                executor, partial(checked_next, iterator)
            )
        ) is not StopIterationSentinelType.StopIterationSentinel:
            yield value

Alternatively if you don't care about static type checking you can use StopIterationSentinel = object().
For more info about sentinel values check PEP 661.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment