Last active
September 11, 2025 19:25
-
-
Save anecdata/89b4458fd7c0c01c7fc0a9ff4be5b4e6 to your computer and use it in GitHub Desktop.
CircuitPython asyncio TCP multi-server multi-client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import random | |
import os | |
import wifi | |
import socketpool | |
import asyncio | |
NUM_CLIENTS = 4 | |
NUM_SERVERS = 2 | |
BASE_PORT = 5000 | |
MAXBUF = 64 | |
class Socket: | |
""" https://github.com/adafruit/circuitpython/pull/7173 """ | |
def __init__(self, s): | |
self.s = s | |
s.setblocking(False) | |
async def recv_into(self, buf): | |
await asyncio.core._io_queue.queue_read(self.s) | |
return self.s.recv_into(buf) | |
async def send(self, buf): | |
await asyncio.core._io_queue.queue_write(self.s) | |
return self.s.send(buf) | |
async def accepted_conn(server_num, conn, event): | |
"""handles a single server connection from a client""" | |
inbuf = bytearray(MAXBUF) | |
sconn = Socket(conn) | |
size = False | |
while not size: | |
try: | |
size = await(sconn.recv_into(inbuf)) # OSError: [Errno 128] ENOTCONN | |
except OSError as ex: | |
await asyncio.sleep(0) | |
print(f"{time.monotonic():.1f}s SERVER {server_num} received {size} bytes {inbuf[:size]}") | |
outbuf = f"{time.monotonic():.1f}s Hello client from server {server_num}".encode() | |
await(sconn.send(outbuf)) | |
print(f"{time.monotonic():.1f}s SERVER {server_num} sent {len(outbuf)} bytes {outbuf}") | |
# sconn.close() # AttributeError: 'Socket' object has no attribute 'close' | |
conn.close() | |
event.set() | |
async def server(server_num): | |
"""persistent TCP server""" | |
BACKLOG = NUM_CLIENTS | |
s = pool.socket(pool.AF_INET, pool.SOCK_STREAM) | |
s.setsockopt(pool.SOL_SOCKET, pool.SO_REUSEADDR, 1) # | |
port = BASE_PORT + server_num | |
s.bind((host, port)) | |
s.listen(BACKLOG) | |
print(f"{time.monotonic():.1f}s SERVER {server_num} listening on port {port}") | |
total = 0 | |
while True: | |
print(f"{time.monotonic():.1f}s SERVER {server_num} accepting connections") | |
s.setblocking(False) | |
addr = False | |
while not addr: | |
try: | |
conn, addr = s.accept() # OSError: [Errno 11] EAGAIN | |
print(f"{time.monotonic():.1f}s SERVER {server_num} accepted connection from {addr}") | |
event = asyncio.Event() | |
t = asyncio.create_task(accepted_conn(server_num, conn, event)) | |
await event.wait() | |
t.cancel() | |
total += 1 | |
print(f"{time.monotonic():.1f}s SERVER {server_num} has processed {total} total connections") | |
except OSError as ex: | |
await asyncio.sleep(0) | |
await asyncio.sleep(0) | |
async def connected_client(client_num, s, event): | |
"""handles a single client connection to a server""" | |
inbuf = bytearray(MAXBUF) | |
ss = Socket(s) | |
outbuf = f"{time.monotonic():.1f}s Hello server from client {client_num}".encode() | |
size = await(ss.send(outbuf)) | |
print(f"{time.monotonic():.1f}s CLIENT {client_num} sent {size} bytes {outbuf}") | |
size = False | |
while not size: | |
try: | |
size = await(ss.recv_into(inbuf)) # OSError: [Errno 11] EAGAIN | |
except OSError as ex: | |
await asyncio.sleep(0) | |
print(f"{time.monotonic():.1f}s CLIENT {client_num} received {size} bytes {inbuf[:size]}") | |
# ss.close() # AttributeError: 'Socket' object has no attribute 'close' | |
event.set() | |
async def client(client_num): | |
"""TCP client wrapper""" | |
total = 0 | |
while True: | |
with pool.socket(pool.AF_INET, pool.SOCK_STREAM) as s: | |
port = random.randint(BASE_PORT, BASE_PORT + NUM_SERVERS) | |
print(f"{time.monotonic():.1f}s CLIENT {client_num} connecting to ({host}, {port})") | |
try: | |
s.connect((host, port)) # OSError: [Errno 119] EINPROGRESS | |
print(f"{time.monotonic():.1f}s CLIENT {client_num} connected to ({host}, {port})") | |
event = asyncio.Event() | |
t = asyncio.create_task(connected_client(client_num, s, event)) | |
await event.wait() | |
t.cancel | |
total += 1 | |
print(f"{time.monotonic():.1f}s CLIENT {client_num} has processed {total} total connections") | |
except OSError as ex: | |
await asyncio.sleep(0) | |
await asyncio.sleep(random.random()) | |
async def main(): | |
print(f"{time.monotonic():.1f}s creating {NUM_SERVERS} servers, {NUM_CLIENTS} clients") | |
tasks = [] | |
for server_num in range(NUM_SERVERS): | |
tasks.append(asyncio.create_task(server(server_num))) | |
for client_num in range(NUM_CLIENTS): | |
tasks.append(asyncio.create_task(client(client_num))) | |
print(f"{time.monotonic():.1f}s {len(tasks)} tasks created") | |
while True: | |
await asyncio.sleep(0) | |
time.sleep(3) # wait for serial after reset | |
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD')) | |
host = str(wifi.radio.ipv4_address) | |
pool = socketpool.SocketPool(wifi.radio) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hard fault was due to using 9.2.8, which still had this issue that had been largely attributed to socket servers. addendum: The minimal example was fixed by 10.0.0-beta.3, but the full code still hard faults... back to the drawing board.
Note that in CP 10 (10.0.0-beta.3), there are now only 4 TCP sockets available on
espressif
, rather than 8:Update: back to 8 sockets with adafruit/circuitpython#10609.