Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active August 18, 2025 16:02
Show Gist options
  • Save anecdata/77d9afc24ae88765c78c18e6328fc486 to your computer and use it in GitHub Desktop.
Save anecdata/77d9afc24ae88765c78c18e6328fc486 to your computer and use it in GitHub Desktop.
CircuitPython ESP-NOW Sender + async ESP-NOW Receiver with Ethernet Requests
# SPDX-FileCopyrightText: 2025 anecdata
# SPDX-License-Identifier: MIT
# async ESP-NOW Receiver with Ethernet Requests
import time
import traceback
import board
import busio
import digitalio
import espnow
import asyncio
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
import adafruit_connection_manager
import adafruit_requests
from sekrets import * # PMK, LMK, MAC
SNDR_CH = 0 # channel 1 (unless connected to an AP or acting as an AP)
URL = "https://httpbin.org/get"
def eth_config():
spi = busio.SPI(board.ETH_CLK, MOSI=board.ETH_MOSI, MISO=board.ETH_MISO)
cs = digitalio.DigitalInOut(board.ETH_CS)
rst = digitalio.DigitalInOut(board.ETH_RST)
return WIZNET5K(spi, cs, reset=rst)
def requests_config(eth):
pool = adafruit_connection_manager.get_radio_socketpool(eth)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(eth)
return adafruit_requests.Session(pool, ssl_context)
async def esp_now_recv():
peers = [espnow.Peer(mac=SNDR_MAC, lmk=SNDR_LMK, encrypted=True, channel=SNDR_CH),]
with espnow.ESPNow() as e:
e.set_pmk(RCVR_PMK)
peers_report = ""
for peer in peers:
e.peers.append(peer)
peers_report += f"mac={peer.mac} lmk={peer.lmk} ch={peer.channel} if={peer.interface} enc={peer.encrypted}, "
print(f"ESP-NOW peers=[{peers_report}]")
while True:
if e:
try:
print(f"ESP-NOW Receiving...", end=" ")
msg = e.read()
print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}] buf={e.buffer_size} phy={e.phy_rate} msg={msg}")
except ValueError as ex: # Invalid buffer
traceback.print_exception(ex, ex, ex.__traceback__)
await asyncio.sleep(0)
async def eth_request():
eth = eth_config()
print(f"ETH {eth.pretty_mac(eth.mac_address)} {eth.pretty_ip(eth.ip_address)}")
requests = requests_config(eth)
while True:
try:
with requests.get(URL) as resp:
print(f"ETH {resp.status_code} {resp.reason.decode()}")
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
await asyncio.sleep(5)
async def main():
esp_task = asyncio.create_task(esp_now_recv())
eth_task = asyncio.create_task(eth_request())
await asyncio.gather(esp_task, eth_task)
time.sleep(3) # wait for serial
asyncio.run(main())
@anecdata
Copy link
Author

anecdata commented Aug 16, 2025

# SPDX-FileCopyrightText: 2025 anecdata
# SPDX-License-Identifier: MIT

# ESP-NOW Sender
import time
import random
import traceback
import supervisor
import espnow

from sekrets import *  # PMK, LMK, MAC


RCVR_CH = 0  # channel 1 (unless connected to an AP or acting as an AP)


def random_ascii_printable_bytes(length=17):
    rap = bytearray()
    for _ in range(0, length):
        rap.append(random.randrange(0x20, 0x7F))  # 7-bit ASCII printable
    return rap

def espnow_send(e, msg):
    try:
        print(f"ESP-NOW Sending...", end=" ")
        e.send(msg)
        print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}] buf={e.buffer_size} phy={e.phy_rate} msg={msg}")
    except Exception as ex:  # ESP-NOW error 0x306a
        traceback.print_exception(ex, ex, ex.__traceback__)


time.sleep(3)  # wait for serial
peers = [espnow.Peer(mac=RCVR_MAC, lmk=RCVR_LMK, encrypted=True, channel=RCVR_CH),]
with espnow.ESPNow() as e:
    e.set_pmk(SNDR_PMK)

    peers_report = ""
    for peer in peers:
        e.peers.append(peer)
        peers_report += f"mac={peer.mac} lmk={peer.lmk} ch={peer.channel} if={peer.interface} enc={peer.encrypted}, "
    print(f"ESP-NOW peers=[{peers_report}]")

    while True:
        msg = random_ascii_printable_bytes()
        espnow_send(e, msg)
        time.sleep(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment