Last active
December 23, 2022 16:18
-
-
Save sourceperl/756fbbfb67df23e62dee5f263f0b48f4 to your computer and use it in GitHub Desktop.
A micropython example of HTTP (json) data retrieval with handling of wifi connection errors on the Pico W board.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uasyncio as aio | |
from machine import Pin | |
import network | |
import rp2 | |
import errno | |
import urequests | |
import gc | |
from private_data import WIFI_SSID, WIFI_KEY | |
# some const | |
MODE_DEBUG = False | |
# log levels | |
LVL_ERROR = 40 | |
LVL_WARNING = 30 | |
LVL_INFO = 20 | |
LVL_DEBUG = 10 | |
LVL_NOTSET = 0 | |
# some class | |
class BasicLog: | |
def __init__(self, lvl=LVL_DEBUG): | |
self.lvl = lvl | |
def log(self, msg, lvl=LVL_NOTSET): | |
if lvl >= self.lvl: | |
print(msg) | |
def debug(self, msg): | |
self.log(f'DEBUG:{msg}', lvl=LVL_DEBUG) | |
def info(self, msg): | |
self.log(f'INFO:{msg}', lvl=LVL_INFO) | |
def warning(self, msg): | |
self.log(f'WARN:{msg}', lvl=LVL_WARNING) | |
def error(self, msg): | |
self.log(f'ERROR:{msg}', lvl=LVL_ERROR) | |
# asyncio tasks | |
async def radio_task(): | |
while True: | |
# wlan connect (ensure wlan is disconnect before) | |
log.info('wlan connect') | |
wlan.disconnect() | |
wlan.connect(WIFI_SSID, WIFI_KEY) | |
# wait for connection | |
log.info('waiting for connection') | |
connect_try = 0 | |
while connect_try < 10: | |
log.debug(f'{connect_try=}') | |
connect_try += 1 | |
if wlan.status() == network.STAT_GOT_IP: | |
log.info(f'wlan connected, @IP={wlan.ifconfig()[0]}') | |
break | |
await aio.sleep_ms(1000) | |
# periodic connection check | |
while wlan.status() == network.STAT_GOT_IP: | |
await aio.sleep_ms(1000) | |
log.warning('wlan seem disconnected, restart connect loop') | |
async def get_task(): | |
# send LED command, escape this loop for wifi reconnect on nedd | |
while True: | |
try: | |
if wlan.isconnected(): | |
log.info('get ISS data') | |
# urequests is not write for asyncio, so we can block here (max time = timeout delay) | |
r = urequests.get('http://api.open-notify.org/iss-now.json', timeout=4.0) | |
log.info(f'ISS data: {r.json()}') | |
gc.collect() | |
except OSError as e: | |
log.warning(f'error occur: {e}') | |
if e.errno == errno.EINPROGRESS: | |
wlan.disconnect() | |
await aio.sleep_ms(5000) | |
if __name__ == '__main__': | |
# init I/O | |
led = Pin('LED', Pin.OUT) | |
# init logging | |
log = BasicLog(lvl=LVL_DEBUG if MODE_DEBUG else LVL_INFO) | |
# wlan setup | |
rp2.country('FR') | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
# disable powersaving mode | |
wlan.config(pm=0xa11140) | |
# init asyncio tasks and run | |
loop = aio.get_event_loop() | |
loop.create_task(radio_task()) | |
loop.create_task(get_task()) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment