Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Created December 3, 2025 06:15
Show Gist options
  • Select an option

  • Save paulwinex/258f4d64bfca99946281f6d10a1b804d to your computer and use it in GitHub Desktop.

Select an option

Save paulwinex/258f4d64bfca99946281f6d10a1b804d to your computer and use it in GitHub Desktop.
import asyncio
import random
async def do_it(n):
if random.random() < 0.1:
raise Exception('Failed test')
await asyncio.sleep(random.uniform(0.5, 2))
return n
async def all_completed(count):
tasks_all = [asyncio.create_task(do_it(i), name=f'Task-{i}') for i in range(count)]
done, pending = await asyncio.wait(
tasks_all,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
try:
print(f"Task {task.get_name()} done: {task.result()}")
except Exception as e:
print(f"Task {task.get_name()} failed: {e}")
async def first_completed(count):
tasks_all = [asyncio.create_task(do_it(i), name=f'Task-{i}') for i in range(count)]
done, pending = await asyncio.wait(
tasks_all,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
print(f"Task {task.get_name()} done: {task.result()}")
except Exception as e:
print(f"Task {task.get_name()} failed: {e}")
for task in pending:
task.cancel()
async def first_exception(count):
tasks_all = [asyncio.create_task(do_it(i), name=f'Task-{i}') for i in range(count)]
done, pending = await asyncio.wait(
tasks_all,
return_when=asyncio.FIRST_EXCEPTION
)
for task in done:
try:
print(f"Task {task.get_name()} done: {task.result()}")
except Exception as e:
print(f"Task {task.get_name()} failed: {e}")
for task in pending:
task.cancel()
task_count = 10
print('ALL COMPLETED')
asyncio.run(all_completed(task_count))
print('FIRST COMPLETED')
asyncio.run(first_completed(task_count))
print('FIRST EXCEPTION')
asyncio.run(first_exception(task_count))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment