Skip to content

Instantly share code, notes, and snippets.

@graingert
Last active March 13, 2025 16:09
Show Gist options
  • Save graingert/ed3d174b10d90d69baf29d1ec606f8d5 to your computer and use it in GitHub Desktop.
Save graingert/ed3d174b10d90d69baf29d1ec606f8d5 to your computer and use it in GitHub Desktop.
import anyio
import socket
async def demo():
s1, s2 = socket.socketpair()
with s1, s2:
s1.setblocking(False)
s2.setblocking(False)
async with anyio.create_task_group() as tg:
tg.start_soon(anyio.wait_readable, s2)
await anyio.wait_all_tasks_blocked()
await anyio.sleep(0.1)
tg.cancel_scope.cancel()
s1, s2 = socket.socketpair()
with s1, s2:
s1.setblocking(False)
s2.setblocking(False)
s1.send(b"\x00")
await anyio.wait_readable(s2)
print("done")
anyio.run(demo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment