Skip to content

Instantly share code, notes, and snippets.

@dhirschfeld
Last active March 24, 2026 00:35
Show Gist options
  • Select an option

  • Save dhirschfeld/6c3f588075d84c6d2b94b7cf22c908aa to your computer and use it in GitHub Desktop.

Select an option

Save dhirschfeld/6c3f588075d84c6d2b94b7cf22c908aa to your computer and use it in GitHub Desktop.
Async Python
name async-python
description USE FOR: writing, reviewing, or refactoring any Python async/await code — coroutines, task groups, cancel scopes, timeouts, producer/consumer channels, or concurrent I/O. Also use when migrating from raw asyncio to structured concurrency, debugging async deadlocks or cancellation bugs, replacing asyncio.create_task with task groups, choosing between asyncio.Queue and memory object streams, or asking about fire-and-forget alternatives. Applies even if the user mentions only asyncio, aiohttp, or httpx — this skill enforces strict structured concurrency with anyio. DO NOT USE FOR: synchronous threading or multiprocessing without async/await.

Async Python Best Practices — Structured Concurrency with AnyIO

When to use this skill

Use this skill when:

  • Writing new async/await Python code
  • Refactoring synchronous code to async
  • Reviewing async Python for correctness, cancellation safety, or performance
  • Debugging deadlocks, race conditions, or task cancellation issues
  • Migrating from raw asyncio to structured concurrency

Why AnyIO, not raw asyncio

async/await is Python syntax built on generators — it is not tied to any specific event loop. AnyIO is a structured concurrency library that works on top of both asyncio and Trio, providing:

  • Level-triggered cancellation — every await inside a cancelled CancelScope raises CancelledError, preventing the silent swallowing of cancellation that causes deadlocks in raw asyncio (which uses edge-triggered, one-shot cancellation).
  • Cancel scopes — nested, composable timeout and cancellation regions that asyncio lacks entirely.
  • tg.start() — wait for a child task to signal readiness before continuing.
  • Memory object streams — structured producer/consumer channels with backpressure by default.
  • Portable — code runs identically on asyncio and Trio with no changes.

AnyIO is already a dependency of httpx, FastAPI/Starlette, Jupyter, and many other libraries — you likely already have it installed.

Core principles

  1. Every concurrent task must live inside a task group. Use anyio.create_task_group() for all concurrent work. The task group block does not exit until every child task has finished. This is the fundamental invariant of structured concurrency.
  2. Never use asyncio.create_task(). It is the concurrent equivalent of goto — a one-way jump that breaks function abstraction, resource cleanup, and error propagation. There is no safe "fire-and-forget" wrapper that fixes this.
  3. Never block the event loop. Offload blocking I/O with await anyio.to_thread.run_sync(). Never call time.sleep() — use await anyio.sleep().
  4. Pass callables, not coroutine objects. tg.start_soon(myfunc) not tg.start_soon(myfunc()). This avoids "coroutine was never awaited" warnings entirely and allows task restart by supervisors.

Task creation and management

Use create_task_group for concurrent work

import anyio

async def fetch_all(urls: list[str]) -> list[str]:
    results: list[str] = [None] * len(urls)  # type: ignore[list-item]

    async def _fetch_one(i: int, url: str) -> None:
        results[i] = await fetch(url)

    async with anyio.create_task_group() as tg:
        for i, url in enumerate(urls):
            tg.start_soon(_fetch_one, i, url)
    return results

If any task raises, the group cancels the remaining sibling tasks and re-raises as an ExceptionGroup. Handle with except*.

Get a result from a task via nonlocal

Tasks in a task group cannot return values directly. Use closure over a nonlocal variable:

async def fetch_both(url1: str, url2: str) -> tuple[str, str]:
    result1: str | None = None
    result2: str | None = None

    async def get1() -> None:
        nonlocal result1
        result1 = await fetch(url1)

    async def get2() -> None:
        nonlocal result2
        result2 = await fetch(url2)

    async with anyio.create_task_group() as tg:
        tg.start_soon(get1)
        tg.start_soon(get2)
    # Both tasks finished — results guaranteed to be set
    assert result1 is not None and result2 is not None
    return result1, result2

Wait for a task to be ready with tg.start()

Use tg.start() when a child task needs to perform initialization before the parent continues:

import anyio
from anyio.abc import TaskStatus

async def server_task(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED) -> None:
    listener = await setup_listener()
    task_status.started()  # Signal readiness to parent
    await serve_forever(listener)

async def main() -> None:
    async with anyio.create_task_group() as tg:
        await tg.start(server_task)  # Blocks until started() is called
        # Server is now guaranteed to be listening
        await interact_with_server()

Cancellation and timeouts

AnyIO uses cancel scopes — nested, composable regions that control cancellation. A task group contains its own cancel scope automatically.

Enforce deadlines with fail_after / move_on_after

import anyio

# Raise TimeoutError if the block takes longer than 5 seconds
with anyio.fail_after(5):
    await do_slow_work()

# Silently move on (no exception) if the block takes too long
with anyio.move_on_after(5) as scope:
    await do_slow_work()
if scope.cancelled_caught:
    print("Timed out, using fallback")

Cancel an entire task group

async with anyio.create_task_group() as tg:
    tg.start_soon(worker_a)
    tg.start_soon(worker_b)
    tg.cancel_scope.cancel()  # Cancel all children immediately

Cleanup under cancellation

Write cancellation-safe code: assume any await can raise CancelledError. Use try/finally for cleanup, and shield the cleanup code so that level-triggered cancellation does not prevent the cleanup awaits from completing:

from anyio import CancelScope

async def resilient_operation() -> None:
    resource = await acquire()
    try:
        await do_work(resource)
    finally:
        with CancelScope(shield=True):
            await release(resource)  # Runs even on cancellation

Without the shielded scope, await release(resource) would immediately raise CancelledError under level-triggered cancellation, skipping the cleanup entirely.

Shield work from cancellation

Use anyio.CancelScope(shield=True) to protect a block of cleanup code from external cancellation. Unlike asyncio.shield(), this is a scope (not a single-expression wrapper), composes with task groups, and never creates orphaned tasks:

from anyio import CancelScope

async def graceful_shutdown(resource: Resource) -> None:
    with CancelScope(shield=True):
        # Everything in this block is protected from external cancellation.
        # Multiple awaits are safe here.
        await resource.flush()
        await resource.close()

Synchronization

Use anyio synchronization primitives — never threading primitives in async code.

  • anyio.Lock — mutual exclusion
  • anyio.Event — one-shot signalling
  • anyio.Semaphore — limit concurrency
  • anyio.CapacityLimiter — limit concurrent access to a resource (like a semaphore, but with ownership tracking)
import anyio

limiter = anyio.CapacityLimiter(10)

async def rate_limited_fetch(url: str) -> str:
    async with limiter:
        return await fetch(url)

Producer/consumer with memory object streams

Use anyio.create_memory_object_stream instead of asyncio.Queue. Streams provide backpressure by default, structured close via clone(), and clean StopAsyncIteration when all producers or consumers are done — no sentinel values or manual counting needed.

import anyio
from anyio.streams.memory import MemoryObjectSendStream

async def producer(tx: MemoryObjectSendStream[int]) -> None:
    async with tx:
        for i in range(10):
            await tx.send(i)

async def consumer(label: str, rx: anyio.abc.ObjectReceiveStream[int]) -> None:
    async with rx:
        async for item in rx:
            print(f"{label}: {item}")

async def main() -> None:
    tx, rx = anyio.create_memory_object_stream[int]()
    async with anyio.create_task_group() as tg:
        tg.start_soon(producer, tx)
        tg.start_soon(consumer, "worker", rx)

Blocking code integration

Wrap blocking calls so they don't stall the event loop:

import anyio

# I/O-bound blocking code
result = await anyio.to_thread.run_sync(blocking_io_function, arg1, arg2)

# CPU-bound code — use a capacity limiter with a process pool or subinterpreter
# (For heavy CPU work, consider anyio.to_process.run_sync on Python 3.13+
# or run_sync with a ProcessPoolExecutor via anyio.from_thread)

Never call time.sleep() in async code — use await anyio.sleep().

Error handling

  • With task groups, catch ExceptionGroup using except* syntax.
  • Errors in child tasks propagate automatically — this is a core benefit of structured concurrency. No special error-forwarding code is needed.
  • Never use asyncio.shield() — it creates an orphaned inner task whose result is silently discarded. Use CancelScope(shield=True) instead.
import anyio
import logging

async def safe_main() -> None:
    try:
        async with anyio.create_task_group() as tg:
            tg.start_soon(may_fail)
            tg.start_soon(may_also_fail)
    except* ValueError as eg:
        for exc in eg.exceptions:
            logging.error("ValueError: %s", exc)
    except* OSError as eg:
        for exc in eg.exceptions:
            logging.error("OSError: %s", exc)

Testing

AnyIO includes a built-in pytest plugin. No need for pytest-asyncio.

The simplest setup is anyio_mode = "auto" — all async test functions and fixtures are handled automatically with no per-test markers needed:

# pyproject.toml
[tool.pytest.ini_options]
anyio_mode = "auto"
import anyio

async def test_fetch() -> None:
    with anyio.fail_after(5):
        result = await fetch("https://example.com")
    assert result.status == 200

If auto mode conflicts with another plugin (e.g. pytest-asyncio also in auto mode), fall back to explicit markers instead:

import pytest

@pytest.mark.anyio
async def test_fetch() -> None:
    ...

To test on multiple backends, the plugin parameterizes automatically. To restrict to asyncio only:

# pyproject.toml
[tool.pytest.ini_options]
anyio_default_backend = "asyncio"

Application-scoped background tasks

The most common question in structured concurrency is: "How do I run a background task that outlives a single request?" The answer is not fire-and-forget. Instead, scope the task group to the application lifetime:

from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
import anyio

@asynccontextmanager
async def lifespan(app) -> AsyncGenerator[dict]:
    async with anyio.create_task_group() as tg:
        yield {"tg": tg}
        tg.cancel_scope.cancel()

Any handler can now start background work via request.state.tg.start_soon(...). The tasks are supervised, errors propagate, and everything is cancelled cleanly on shutdown.

Running the program

import anyio

async def main() -> None:
    async with anyio.create_task_group() as tg:
        tg.start_soon(worker)

anyio.run(main)  # Pass the callable, not main()

Common mistakes to avoid

Mistake Fix
Using asyncio.create_task() (unstructured) Use anyio.create_task_group() and tg.start_soon()
Using threading.Lock in async code Use anyio.Lock
Running sync HTTP libraries (e.g., requests) in coroutines Use httpx (async) or await anyio.to_thread.run_sync()
Creating too many concurrent tasks without bounds Use anyio.CapacityLimiter or anyio.Semaphore
Using asyncio.shield() to protect cleanup Use anyio.CancelScope(shield=True) — a scope, not a wrapper
Using asyncio.Queue for channels Use anyio.create_memory_object_stream (backpressure by default)
Passing coroutine objects: tg.start_soon(coro()) Pass callables: tg.start_soon(coro)
Using asyncio.timeout() / asyncio.wait_for() Use anyio.fail_after() / anyio.move_on_after()
Using asyncio.sleep() Use anyio.sleep()
Using time.sleep() in async code Use await anyio.sleep()
Using pytest-asyncio Use anyio's built-in plugin: @pytest.mark.anyio

Gotchas: misconceptions from asyncio habits

These are specific bad practices and misconceptions that developers coming from raw asyncio commonly carry into structured concurrency code.

1. "Fire-and-forget with a _background_tasks set is fine"

Wrong. The common asyncio pattern of storing tasks in a global set with add_done_callback to discard them is exactly the unstructured goto-style concurrency that structured concurrency eliminates. Tasks in that set have no parent, no guaranteed cleanup, and silently swallow exceptions. The fix is always a task group scoped to the appropriate lifetime.

2. "asyncio.TaskGroup is structured concurrency"

Partially. asyncio.TaskGroup is a step forward, but it still usesedge-triggered cancellation: CancelledError is a one-shot event that can be caught and swallowed, causing deadlocks. In Python 3.11/3.12, nested task groups with simultaneous child failures can hang forever. AnyIO's task groups use level-triggered cancellation — every checkpoint inside a cancelled scope re-raises, making cancellation impossible to accidentally swallow. AnyIO also backports fixes that only landed in CPython 3.13.

3. "asyncio.shield() protects my cleanup code"

Wrong. asyncio.shield() is a wrapper around a single awaitable that creates an orphaned inner task. The outer coroutine receives CancelledError immediately, but the inner task runs unsupervised with no parent — its result is silently discarded. This is unstructured concurrency. Use anyio.CancelScope(shield=True) which protects an entire block of code, nests properly with task groups, and never orphans tasks.

4. "I should catch CancelledError to handle cancellation"

Wrong in most cases. Use try/finally for cleanup, not except CancelledError. In asyncio's edge-triggered model, catching CancelledError "uses up" the cancellation — subsequent awaits succeed normally, which is a source of deadlocks. In AnyIO's level-triggered model, cancellation persists across the entire cancelled scope, so catching it is less dangerous but still usually wrong — finally is the correct tool for cleanup.

5. "asyncio.wait_for() is equivalent to a timeout"

Not safely. asyncio.wait_for() cancels the task on timeout but uses edge-triggered cancellation, so the task's cleanup code may hang on subsequent awaits. anyio.fail_after() wraps a cancel scope with level-triggered cancellation — all awaits inside the scope continue to raise after the deadline.

6. "I can use asyncio.Queue for producer/consumer"

It works, but poorly. asyncio.Queue is unbounded by default (no backpressure), has no structured close mechanism (prior to 3.13's shutdown()), requires manual producer/consumer counting, and needs sentinel values for termination. anyio.create_memory_object_stream provides backpressure by default (max_buffer_size=0), uses clone() for fan-out, and automatically signals end-of-stream when all senders close — zero sentinel values needed.

7. "Structured concurrency means I can never have long-running tasks"

Wrong. You scope the task group to the appropriate lifetime. For a web server, that's the application lifespan. For a connection, it's the connection context. The rule is not "tasks must be short" — it's "tasks must have an owner". Pass the task group (or a reference to it) as an argument when you need to spawn from deeper call sites.

@dhirschfeld
Copy link
Copy Markdown
Author

  • under "common mistakes" I would also add the use of time.sleep()
  • as for the test plugin, I would recommend the use of anyio_mode = "auto" in pytest config
  • the "Cleanup under cancellation" is lacking a shielded cancel scope

I think that should be addressed in the latest version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment