Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active July 29, 2025 02:07
Show Gist options
  • Save anecdata/69c44f20bac2b55546e4438a01af283c to your computer and use it in GitHub Desktop.
Save anecdata/69c44f20bac2b55546e4438a01af283c to your computer and use it in GitHub Desktop.
SSIDNet™️
# SPDX-FileCopyrightText: 2025 anecdata
# SPDX-License-Identifier: MIT
# SSIDNet™️ message sender
import time
import random
import math
import binascii
import traceback
import wifi
CHANNEL = 9
def random_local_mac():
mac = bytearray()
for _ in range(0, 6):
mac.append(random.randrange(0, 256))
mac[0] = mac[0] | 0b00000010 # local
mac[0] = mac[0] & 0b11111110 # multicast not supported
return mac
def random_ascii_printable(length=17):
rap = bytearray()
for _ in range(0, length):
rap.append(random.randrange(0x20, 0x7F)) # 7-bit ASCII printable
rap = rap.decode()
return rap
def obfuscate(imsg):
omsg = bytearray()
for i in range(0, len(imsg)):
omsg.append(imsg[i] ^ 0b01010101)
return omsg
def deobfuscate(msg):
return obfuscate(msg)
def start_new_ap(ap_ssid):
ap_password = random_ascii_printable(length=17)
wifi.radio.start_ap(ap_ssid, ap_password, channel=CHANNEL, authmode=[wifi.AuthMode.WPA2, wifi.AuthMode.PSK], max_connections=0)
wifi.radio.mac_address_ap = random_local_mac()
wifi.radio.enabled = True
def stop_ap():
wifi.radio.stop_ap()
wifi.radio.enabled = False
messages = [
b"",
b"From the ashes a fire shall be woken, A light from the shadows shall spring; Renewed shall be blade that was broken, The crownless again shall be king. ―J.R.R. Tolkien",
b"If you can't explain it to a six year old, you don't understand it yourself. ―Albert Einstein",
b"I don’t know why people are so keen to put the details of their private life in public; they forget that invisibility is a superpower. —Banksy",
b"I’ve learned that people will forget what you said, people will forget what you did, but people will never forget how you made them feel. —Maya Angelou",
b"There is more to life than increasing its speed. —Mohandas K. Gandhi",
b"Each generation imagines itself to be more intelligent than the one that went before it and wiser than the one that comes after it.—George Orwell",
b"You cannot do a kindness too soon, for you never know how soon it will be too late. —Ralph Waldo Emerson",
]
# no 0x0 in SSID (CP terminates buffer?)
time.sleep(3) # wait for serial after reset
while True:
for m in range(1, len(messages)):
print(f'✅ {messages[m].decode()}')
message = obfuscate(messages[m])
crc = binascii.crc32(message).to_bytes(4)
num_frames = int(math.ceil((len(message) + 4) / 28))
for f in range(1, num_frames + 1):
try:
ssid = bytearray(4)
ssid[0] = 0x01 # ␁ [0] ProtoID
ssid[1] = m # [1] MsgID
ssid[2] = f # [2] frame#
ssid[3] = num_frames # [3] #frames
if f < num_frames:
ssid.extend(message[28*(f-1):28*(f-1)+28])
else:
ssid.extend(message[28*(f-1):])
ssid.extend(crc)
print(f'🛜 Starting AP... {bytes(ssid)}')
start_new_ap(ssid)
time.sleep(0.75) #
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
finally:
stop_ap()
@anecdata
Copy link
Author

anecdata commented Jul 25, 2025

# SPDX-FileCopyrightText: 2025 anecdata
# SPDX-License-Identifier: MIT

# SSIDNet™️ message receiver

import time
import binascii
import traceback
import wifi

CHANNEL = 9
QUEUELEN = 1024
DEBUG = False

type_names = ("mgmt", "ctrl", "data", "extn",)

subt = { 0: {'name': 'AssocReq',   'fixed':  4},
         1: {'name': 'AssocResp',  'fixed':  6},
         2: {'name': 'RAssocReq',  'fixed': 10},
         3: {'name': 'RAssocResp', 'fixed':  6},
         4: {'name': 'ProbeReq',   'fixed':  0},
         5: {'name': 'ProbeResp',  'fixed': 12},
         6: {'name': 'Timing',     'fixed':  0},
         7: {'name': 'Reserved7',  'fixed':  0},
         8: {'name': 'Beacon',     'fixed': 12},
         9: {'name': 'ATIM',       'fixed':  0},
        10: {'name': 'DisAssoc',   'fixed':  2},
        11: {'name': 'Auth',       'fixed':  6},
        12: {'name': 'DeAuth',     'fixed':  2},
        13: {'name': 'Action',     'fixed':  0},
        14: {'name': 'ActionN',    'fixed':  0},
        15: {'name': 'ReservedF',  'fixed':  0}}

def check_type(mac):
    mactype = ""
    try:
        mac_int = int("".join(("0x", mac[0:2])))
        if (mac_int & 0b0011) == 0b0011:  # 3,7,B,F LOCAL MULTICAST
            mactype = "L_M"
        elif (mac_int & 0b0010) == 0b0010:  # 2,3,6,7,A,B,E,F LOCAL
            mactype = "LOC"
        elif (mac_int & 0b0001) == 0b0001:  # 1,3,5,7,9,B,D,F MULTICAST
            mactype = "MUL"
        else:  # 0,4,8,C VENDOR (or unassigned)
            mactype = "VEN"
    except (ValueError, IndexError) as e:
        mactype = "ERR"
    return mactype

def parse_header(fd):
    buf = fd["received"]
    fd["type"]     = (buf[0] & 0b00001100) >> 2
    fd["typename"] = type_names[fd["type"]]
    fd["subt"]     = (buf[0] & 0b11110000) >> 4
    fd["subtname"] = subt[fd["subt"]]['name']
    fd["fc0"]      = buf[0]
    fd["fc1"]      = buf[1]
    fd["dur"]      = (buf[3] << 8) + buf[2]
    fd["a1"]   = ":".join("%02X" % _ for _ in buf[4:10])  # mgmt frames: DA
    fd["a2"]   = ":".join("%02X" % _ for _ in buf[10:16])  # mgmt frames: SA
    fd["a3"]   = ":".join("%02X" % _ for _ in buf[16:22])  # mgmt frames: BSSID
    fd["a1_type"]  = check_type(fd["a1"])
    fd["a2_type"]  = check_type(fd["a2"])
    fd["a3_type"]  = check_type(fd["a3"])
    fd["seq"]      = ((buf[22] & 0b00001111) << 8) + buf[23]
    fd["frag"]     = (buf[22] & 0b11110000) >> 4
    return fd

def parse_ssid(fd):
    buf = fd["received"]
    pos = 24 + subt[fd["subt"]]['fixed']
    while pos < fd["len"] - 1:
        try:
            ie_id  = buf[pos]
            ie_len = buf[pos + 1]
            ie_start = pos + 2
            ie_end = ie_start + ie_len

            if (ie_id == 0):
                if (ie_len > 0):
                    if fd["subt"] in (1, 4, 5, 8):  # AssocResp, ProbeReq, ProbeResp, Beacon
                        fd["ssid"] = bytearray()
                        ssid_end = ie_end
                        # max SSID 32 bytes, sometimes longer: bad frame or mis-parse?
                        if ie_end - ie_start > 32:
                            ssid_end = ie_start + 32
                        fd["ssid"].extend(buf[ie_start:ssid_end])
                        break
        except IndexError as ex:
            if DEBUG: print("🟦 IndexError", ex, "pos", pos, "ie_end", ie_end, "len:", fd["len"])
            traceback.print_exception(ex, ex, ex.__traceback__)
        pos = ie_end
    return fd

def print_frame(fd):
    print(f'qlen={fd["qlen"]:3d} lost={fd["lost"]:3d}', end=" ")
    print(f'len={fd["len"]:4d} ch={fd["ch"]:2d} ch={monitor.channel:2d} rssi={fd["rssi"]}', end=" ")
    print(f'{fd["fc0"]:02X} {fd["typename"]} {fd["subtname"]:11s} {fd["fc1"]:02X} {fd["dur"]:04X}', end=" ")
    print(f'{fd["a1"]} {fd["a1_type"]} {fd["a2"]} {fd["a2_type"]} {fd["a3"]} {fd["a3_type"]}', end=" ")
    print(f'seq={fd["seq"]:3d} frag={fd["frag"]:2d}', end=" ")
    print(f'ssid={fd.get("ssid", "")}', end=" ")
    print()

def get_packet():
    fd = {}
    fd["qlen"] = monitor.queued()
    fd["lost"] = monitor.lost()
    received = monitor.packet()
    if received:
        fd["len"] = received[wifi.Packet.LEN]
        fd["ch"] = received[wifi.Packet.CH]
        fd["rssi"] = received[wifi.Packet.RSSI]
        fd["received"] = received[wifi.Packet.RAW]
    return fd

def obfuscate(imsg):
    omsg = bytearray()
    for i in range(0, len(imsg)):
        omsg.append(imsg[i] ^ 0b01010101)
    return omsg

def deobfuscate(msg):
    return obfuscate(msg)

def aggregate_message(frame):
    msg_id = frame[1]  # [1] MsgID
    frame_num = frame[2]  # [2] frame#
    num_frames = frame[3]  # [3] #frames

    if msg_id not in messages:
        messages[msg_id] = {}
        messages[msg_id]["num_frames"] = 0
        messages[msg_id]["frames"] = []
        messages[msg_id]["message"] = bytearray()

    if frame_num > messages[msg_id]["num_frames"]:
        print(f'🛜 Monitor frame received... {bytes(frame)}')
        messages[msg_id]["num_frames"] += 1
        msg_frag = frame[4:]
        messages[msg_id]["frames"].append(msg_frag)
        messages[msg_id]["message"].extend(msg_frag)

        if DEBUG: print(f'{frame[0:4]} {messages[msg_id]["num_frames"]} {frame[4:]} {messages[msg_id]["message"]}')

        if frame_num == num_frames and messages[msg_id]["num_frames"] == num_frames:
            crc_rcvd = frame[-4:]
            crc_calc = binascii.crc32(messages[msg_id]["message"][0:-4]).to_bytes(4)
            if crc_calc == crc_rcvd:
                if DEBUG: print(f"🟢 CRC {crc_calc=} {crc_rcvd=}")
                return msg_id
            else:
                if DEBUG: print(f"🔴 CRC {crc_calc=} {crc_rcvd=}")
    return None

time.sleep(3)  # wait for serial after reset
messages = {}
monitor = wifi.Monitor(channel=CHANNEL, queue=QUEUELEN)
print(f"Monitoring channel {CHANNEL} with queue length {QUEUELEN}...")
while True:
    fd = get_packet()
    if "received" in fd:
        fd = parse_header(fd)
        if (fd["subt"] == 8):
            fd = parse_ssid(fd)
            if fd["ssid"][0] == 1:  # [0] ProtoID b'\x01`
                if DEBUG: print_frame(fd)
                msg_id = aggregate_message(fd["ssid"])
                if msg_id:
                    print(f'✅ {deobfuscate(messages[msg_id]["message"][0:-4]).decode()}')

@anecdata
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment