Skip to content

Instantly share code, notes, and snippets.

@VaguinerG
Last active July 23, 2024 18:41
Show Gist options
  • Save VaguinerG/49b57abc026dd2cb06b214c87ce2ff51 to your computer and use it in GitHub Desktop.
Save VaguinerG/49b57abc026dd2cb06b214c87ce2ff51 to your computer and use it in GitHub Desktop.
simple micropython safe (wss) multithreaded websocket server
import binascii, socket, ssl, _thread, hashlib
certkey = binascii.unhexlify(b"3082013b020100024100cc20643fd3d9c21a0acba4f48f61aadd675f52175a9dcf07fbef610a6a6ba14abb891745cd18a1d4c056580d8ff1a639460f867013c8391cdc9f2e573b0f872d0203010001024100bb17a54aeb3dd7ae4edec05e775ca9632cf02d29c2a089b563b0d05cdf95aeca507de674553f28b4eadaca82d5549a86058f9996b07768686a5b02cb240dd9f1022100f4a63f5549e817547dca97b5c658038e8593cb78c5aba3c4642cc4cd031d868f022100d598d870ffe4a34df8de57047a50b97b71f4d23e323f527837c9edae88c7948302210098560c89a70385c36eb07fd7083235c4c1184e525d838aedf7128958bedfdbb1022051c0dab7057a8176ca966f3feb81123d4974a733df0f958525f547dfd1c271f90220446c2cafad455a671a8cf398e642e1be3b18a3d3aec2e67a9478f83c964c4f1f")
cert = binascii.unhexlify(b"308201d53082017f020203e8300d06092a864886f70d01010505003075310b30090603550406130258583114301206035504080c0b54686550726f76696e63653110300e06035504070c075468654369747931133011060355040a0c0a436f6d70616e7958595a31133011060355040b0c0a436f6d70616e7958595a3114301206035504030c0b546865486f73744e616d65301e170d3139313231383033333935355a170d3239313231353033333935355a3075310b30090603550406130258583114301206035504080c0b54686550726f76696e63653110300e06035504070c075468654369747931133011060355040a0c0a436f6d70616e7958595a31133011060355040b0c0a436f6d70616e7958595a3114301206035504030c0b546865486f73744e616d65305c300d06092a864886f70d0101010500034b003048024100cc20643fd3d9c21a0acba4f48f61aadd675f52175a9dcf07fbef610a6a6ba14abb891745cd18a1d4c056580d8ff1a639460f867013c8391cdc9f2e573b0f872d0203010001300d06092a864886f70d0101050500034100b0513fe2829e9ecbe55b6dd14c0ede7502bde5d46153c8e960ae3ebc247371b525caeb41bbcf34686015a44c50d226e66aef0a97a63874ca5944ef979b57f0b3")
clients = []
generate_accept = lambda key: binascii.b2a_base64(hashlib.sha1(key + b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11').digest()).strip()
def BtH(n):
u, s = ' B kB MB GB TB PB EB ZB YB', 0
while n >= 1000: n /= 1000; s += 2
return "{:.1f}{}".format(n, u[s:s+3])
def read_frame(client_s):
hdr = client_s.read(2)
if len(hdr) < 2:
return None
length = hdr[1] & 0x7F
if length == 126:
length = int.from_bytes(client_s.read(2), 'big')
elif length == 127:
length = int.from_bytes(client_s.read(8), 'big')
# Discards payload if greater than 1kb
if length > 1024
# Split in blocks of 1024 bytes to avoid memory overflow
block_size = 1024
remaining = length
while remaining > 0:
read_size = min(block_size, remaining)
client_s.read(read_size)
remaining -= read_size
# Discards the mask
client_s.read(4)
return ('DISCARDED', length)
mask = client_s.read(4)
payload = bytearray(client_s.read(length))
return bytearray(b ^ mask[i % 4] for i, b in enumerate(payload))
def encode_frame(msg):
msg_bytes = msg.encode('utf-8')
length = len(msg_bytes)
frame = bytearray([129])
if length <= 125:
frame.append(length)
elif length <= 65535:
frame.extend(bytearray([126, (length >> 8) & 255, length & 255]))
else:
frame.extend(bytearray([127, 0, 0, 0, 0, 0, (length >> 24) & 255, (length >> 16) & 255, (length >> 8) & 255, length & 255]))
frame.extend(msg_bytes)
return frame
def close(client_s):
clients.remove(client_s)
client_s.close()
def connection(client_s, addr):
client_s = ssl.wrap_socket(client_s, server_side=True, key=certkey, cert=cert)
ip_addr = '.'.join(map(str, addr[4:8]))
method, path, protocol = client_s.readline().decode().strip().split()
headers = {}
while True:
line = client_s.readline().decode().strip()
if line == "":break
if ": " in line:
key, value = line.split(": ", 1)
headers[key] = value
accept = generate_accept(headers["Sec-WebSocket-Key"].encode('utf-8'))
client_s.write(f'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {accept.decode()}\r\n\r\n'.encode())
clients.append(client_s)
while True:
payload = read_frame(client_s)
if payload == 'DISCARDED':
continue
elif isinstance(payload, tuple) and payload[0] == 'DISCARDED':
discarded_length = payload[1]
client_s.write(encode_frame(f"{hash(ip_addr)}: {BtH(discarded_length)} discarded"))
continue
elif payload is not None:
for client in clients:
client.write(encode_frame(f"{hash(ip_addr)}: {payload.decode('utf-8')}"))
else:
break
close(client_s)
def main():
s = socket.socket()
ai = socket.getaddrinfo("0.0.0.0", 443)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(ai[0][-1])
s.listen(0)
while True:_thread.start_new_thread(connection, s.accept())
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment