-
-
Save mikeshardmind/bd17b3f583c3e7b49b9f8c735f5693d3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
async def main(): | |
async with ws_connect("ws://127.0.0.1:8765") as websockets: | |
await websockets.send("Hello, world.") | |
message = await websockets.recv() | |
print(message) | |
trio.run(main) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import contextlib | |
import os | |
import httpx | |
import wsproto | |
class ConnectionClosed(Exception): | |
pass | |
class WebsocketConnection: | |
def __init__(self, network_steam): | |
self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT) | |
self._network_stream = network_steam | |
self._events = [] | |
async def send(self, text): | |
""" | |
Send a text frame over the websocket connection. | |
""" | |
event = wsproto.events.TextMessage(text) | |
data = self._ws_connection_state.send(event) | |
await self._network_stream.write(data) | |
async def recv(self): | |
""" | |
Receive the next text frame from the websocket connection. | |
""" | |
while not self._events: | |
data = await self._network_stream.read(max_bytes=4096) | |
self._ws_connection_state.receive_data(data) | |
self._events = list(self._ws_connection_state.events()) | |
event = self._events.pop(0) | |
if isinstance(event, wsproto.events.TextMessage): | |
return event.data | |
elif isinstance(event, wsproto.events.CloseConnection): | |
raise ConnectionClosed() | |
@contextlib.asynccontextmanager | |
async def ws_connect(url): | |
headers = { | |
"connection": "upgrade", | |
"upgrade": "websocket", | |
"sec-websocket-key": base64.b64encode(os.urandom(16)), | |
"sec-websocket-version": "13", | |
} | |
async with httpx.AsyncClient() as client: | |
async with client.stream("GET", url, headers=headers) as response: | |
network_steam = response.extensions["network_stream"] | |
yield WebsocketConnection(network_steam) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment