Skip to content

Instantly share code, notes, and snippets.

@vanderw
Created December 11, 2024 08:31
Show Gist options
  • Save vanderw/c47bfbbb8d8635d16c37a270cb1fae3b to your computer and use it in GitHub Desktop.
Save vanderw/c47bfbbb8d8635d16c37a270cb1fae3b to your computer and use it in GitHub Desktop.
Wrap ordinary functions into async style
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
"""Wrap ordinary func to async style, and catch all console signals"""
def job():
''' Ordinary func to be wrapped '''
time.sleep(5.0)
return 5
async def job2():
''' Normal async function'''
await asyncio.sleep(2)
return 3
async def job_async():
'''
job wrapper
'''
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, job)
return res
async def main():
''' main to gather and catch all'''
try:
res = await asyncio.gather(
job_async(),
job2(),
)
print(res) # returns [5, 3] after 5.0 seconds
except asyncio.CancelledError as e:
print('main() cancelled')
except Exception as e:
print(f'main() got: {e}')
if __name__ == '__main__':
asyncio.run(main())
# More closer: Make a decoration to work
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment