Last active
March 29, 2024 07:43
-
-
Save snowwm/df3a0c109001fec0a778b895811a69fc to your computer and use it in GitHub Desktop.
A dirty hack useful if you need to quickly integrate sync and async codebases (or start making some parts async). Not for production!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This example demonstrates using anysync with httpx. | |
""" | |
import logging | |
from anysync import anysync | |
from anysync.httpx_patch import Client | |
logging.basicConfig(level=logging.INFO) | |
client = Client() | |
@anysync | |
async def test(): | |
test1() | |
await test2() | |
test1() | |
def test1(): | |
print("Calling async -> sync -> async") | |
# When we get to this, test() is running in one worker thread and we need | |
# to launch my_ip() in another one. On the first call of test1() | |
# a new thread will be created, and on the second call it will be reused. | |
# You can see it in the log. | |
my_ip() | |
async def test2(): | |
print("Calling async -> async -> async") | |
# Here no intervention is needed, no threads will be launched. | |
await my_ip() | |
@anysync | |
async def my_ip(): | |
res = await client.get("http://ip-api.com/line/?fields=8192") | |
print(f"My IP is {res.text.strip()}") | |
# Just call it. No need to care whether it is async. | |
# This will create a worker thread and show it in the log. | |
test() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module exports a decorator `anysync` that can be used to transparently | |
call async functions from sync context. Moreover, you can arbitrarily mix and | |
nest sync and async function calls. | |
Under the hood it uses a pool of threads each of which runs an event loop. | |
Beware that it will deadlock when the pool becomes full (though it's unlikely). | |
""" | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import functools | |
import inspect | |
import logging | |
__all__ = ["anysync"] | |
def _init_thread(): | |
_logger.info("New async worker thread") | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
def _exec_async(future): | |
_logger.info("Entering async thread") | |
return asyncio.get_event_loop().run_until_complete(future) | |
def anysync(func): | |
@functools.wraps(func) | |
def inner(*args, **kwargs): | |
f = inspect.currentframe().f_back.f_code.co_flags | |
if f & (inspect.CO_COROUTINE | inspect.CO_ASYNC_GENERATOR): | |
# No need to interfere if the caller is already async. | |
return func(*args, **kwargs) | |
# Run in the thread pool and synchronously wait for result. | |
future = _executor.submit(_exec_async, func(*args, **kwargs)) | |
return future.result() | |
return inner | |
_logger = logging.getLogger(__name__) | |
_executor = ThreadPoolExecutor(initializer=_init_thread) | |
anysync.shutdown = _executor.shutdown |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module defines a patched httpx AsyncClient which can be transparently used | |
by sync functions. This requires some tricks because the same AsyncTransport | |
cannot be used in different asyncio event loops. We overcome this by creating | |
a new copy of transport for each loop in which it's about to be used. However, | |
it's difficult to properly close this copies (I suppose each must be closed in | |
its own event loop), so for now they are simply not being closed. | |
""" | |
import asyncio | |
from collections import defaultdict | |
from functools import partial | |
import httpx | |
from . import anysync | |
__all__ = ["Client"] | |
class Client(httpx.AsyncClient): | |
def _init_transport(self, *args, **kwargs): | |
constructor = partial(super()._init_transport, *args, **kwargs) | |
tr = constructor() | |
tr._patch_constructor = constructor | |
return tr | |
def _init_proxy_transport(self, *args, **kwargs): | |
constructor = partial(super()._init_proxy_transport, *args, **kwargs) | |
tr = constructor() | |
tr._patch_constructor = constructor | |
return tr | |
def _transport_for_url(self, url): | |
cur_loop = asyncio.get_running_loop() | |
tr = super()._transport_for_url(url) | |
if getattr(tr, "_patch_copies_by_loop", None) is None: | |
tr._patch_copies_by_loop = defaultdict(tr._patch_constructor) | |
tr._patch_copies_by_loop[cur_loop] = tr | |
return tr._patch_copies_by_loop[cur_loop] | |
Client.request = anysync(Client.request) | |
Client.stream = anysync(Client.stream) | |
Client.send = anysync(Client.send) | |
Client.get = anysync(Client.get) | |
Client.options = anysync(Client.options) | |
Client.head = anysync(Client.head) | |
Client.post = anysync(Client.post) | |
Client.put = anysync(Client.put) | |
Client.patch = anysync(Client.patch) | |
Client.delete = anysync(Client.delete) | |
Client.close = anysync(Client.aclose) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment