Created
June 17, 2022 03:34
-
-
Save gamblor21/e9681057222f6a70fca0608ae3054b3e to your computer and use it in GitHub Desktop.
CircuitPython GPS Async testing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import board | |
import busio | |
import asyncio | |
import adafruit_gps | |
import gps_asyncio | |
import UART_asyncio | |
# switch to 115200bps if we are at the default 9600 at startup | |
uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=10) | |
gps = adafruit_gps.GPS(uart, debug=False) # Use UART/pyserial | |
gps.send_command(b"PMTK251,115200") | |
uart.deinit() | |
#uart = busio.UART(board.TX, board.RX, baudrate=115200, receiver_buffer_size=500, timeout=10) | |
uart = UART_asyncio.UART_asyncio(board.TX, board.RX, baudrate=115200, receiver_buffer_size=500, timeout=10) | |
uart.reset_input_buffer() | |
# Create a GPS module instance. | |
#gps = adafruit_gps.GPS(uart, debug=False) # Use UART/pyserial | |
gps = gps_asyncio.GPS_asyncio(uart, debug=False) # Use UART/pyserial | |
gps.send_command(b"PMTK313,1") # SBAS Enable | |
gps.send_command(b"PMTK301,2") # DGPS Mode WAAS | |
gps.send_command(b"PMTK386,0") # do not update position if m/s less | |
# Turn on the basic GGA and RMC info (what you typically want) | |
gps.send_command(b"PMTK314,0,10,0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0") | |
# Set update rate to 10hz | |
gps.send_command(b"PMTK220,100") | |
print("Go") | |
async def gps_update(): | |
while True: | |
await gps.update() | |
async def display_update(): | |
while True: | |
if gps.has_fix is False: | |
await asyncio.sleep(0) | |
continue | |
print("Lat: {0:.6f} degrees Lon: {1:.6f} degrees".format(gps.latitude, gps.longitude)) | |
print("Lat: {:2.}.{} degrees Lon: {:2.}.{} degrees".format(gps.latitude_degrees, str(gps.latitude_minutes/100)[2:], gps.longitude_degrees, str(gps.longitude_minutes/100)[2:])) | |
gps_speed = gps.speed_knots | |
if gps_speed is None: | |
gps_speed = 0.0 | |
gps_speed *= 1.852 | |
gps_course = gps.track_angle_deg | |
if gps_course is None: | |
gps_course = 0 | |
print("Speed: {} kph Course: {} degrees".format(gps_speed, gps_course)) | |
if gps.satellites is not None: | |
print("# satellites: {} fix: {} hdil: {}".format(gps.satellites, gps.fix_quality, gps.horizontal_dilution)) | |
await asyncio.sleep(1.0) | |
async def main(): | |
print("Creating tasks") | |
update_task = asyncio.create_task(gps_update()) | |
display_task = asyncio.create_task(display_update()) | |
while True: | |
print(".", end='') # to see that something else is running between prints and gps.update() | |
await asyncio.sleep(0.1) | |
print("Done") | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
There are two main files here to give GPS asyncio ability. | |
UART_asyncio - wrap the core busio.UART read and readline methods with methods that will not block. | |
I am not sure how many core classes this type of wrapping would work for. But for UART seemed fine. | |
Things could be expanded more (write only writes X bytes before letting something else go). CPython's | |
serial_asyncio is somewhat close to what I'm doing, though CPython has Transport and Protocols with asyncoio | |
that CP does not have. | |
gps_asyncio - Right now a subclass of the adafruit_gps.GPS class but as someone suggested may be better as a | |
fork project as a lot of code had to be copied so not really OO. In theory could refactor the GPS class too | |
so the non-async methods all have their own functions and you can override just the async ones. But not even sure | |
that would work. To make gps.update() async it then went to _parse_message(), _read_message() and readline() that | |
all had to be modified. And then make sure the new class gets a UART_asyncio class or the readline just blocks | |
anyways (and then why bother). You could rewrite GPS to use UART.read() and .in_waiting only and asyncio.sleep(0.0001) | |
if nothing is waiting. I just did UART fully async to learn. | |
code.py - simple GPS example based on the adafruit_gps example file I was pulling apart for my own use. Simplified it | |
as an example here. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# good stuff starts about line 180 | |
import adafruit_gps | |
import asyncio | |
_GPSI2C_DEFAULT_ADDRESS = const(0x10) | |
_GLL = 0 | |
_RMC = 1 | |
_GGA = 2 | |
_GSA = 3 | |
_GSA_4_11 = 4 | |
_GSV7 = 5 | |
_GSV11 = 6 | |
_GSV15 = 7 | |
_GSV19 = 8 | |
_RMC_4_1 = 9 | |
_ST_MIN = _GLL | |
_ST_MAX = _RMC_4_1 | |
_SENTENCE_PARAMS = ( | |
# 0 - _GLL | |
"dcdcscC", | |
# 1 - _RMC | |
"scdcdcffsDCC", | |
# 2 - _GGA | |
"sdcdciiffsfsIS", | |
# 3 - _GSA | |
"ciIIIIIIIIIIIIfff", | |
# 4 - _GSA_4_11 | |
"ciIIIIIIIIIIIIfffS", | |
# 5 - _GSV7 | |
"iiiiiiI", | |
# 6 - _GSV11 | |
"iiiiiiIiiiI", | |
# 7 - _GSV15 | |
"iiiiiiIiiiIiiiI", | |
# 8 - _GSV19 | |
"iiiiiiIiiiIiiiIiiiI", | |
# 9 - _RMC_4_1 | |
"scdcdcffsDCCC", | |
) | |
# Internal helper parsing functions. | |
# These handle input that might be none or null and return none instead of | |
# throwing errors. | |
def _parse_degrees(nmea_data): | |
# Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value. | |
# Where ddd is the degrees, mm.mmmm is the minutes. | |
if nmea_data is None or len(nmea_data) < 3: | |
return None | |
# To avoid losing precision handle degrees and minutes separately | |
# Return the final value as an integer. Further functions can parse | |
# this into a float or separate parts to retain the precision | |
raw = nmea_data.split(".") | |
degrees = int(raw[0]) // 100 * 1000000 # the ddd | |
minutes = int(raw[0]) % 100 # the mm. | |
minutes += int(f"{raw[1][:4]:0<4}") / 10000 | |
minutes = int(minutes / 60 * 1000000) | |
return degrees + minutes # return parsed string in the format dddmmmmmm | |
def _parse_int(nmea_data): | |
if nmea_data is None or nmea_data == "": | |
return None | |
return int(nmea_data) | |
def _parse_float(nmea_data): | |
if nmea_data is None or nmea_data == "": | |
return None | |
return float(nmea_data) | |
def _parse_str(nmea_data): | |
if nmea_data is None or nmea_data == "": | |
return None | |
return str(nmea_data) | |
def _read_degrees(data, index, neg): | |
# This function loses precision with float32 | |
x = data[index] / 1000000 | |
if data[index + 1].lower() == neg: | |
x *= -1.0 | |
return x | |
def _read_int_degrees(data, index, neg): | |
deg = data[index] // 1000000 | |
minutes = data[index] % 1000000 / 10000 | |
if data[index + 1].lower() == neg: | |
deg *= -1 | |
return (deg, minutes) | |
def _parse_talker(data_type): | |
# Split the data_type into talker and sentence_type | |
if data_type[:1] == b"P": # Proprietary codes | |
return (data_type[:1], data_type[1:]) | |
return (data_type[:2], data_type[2:]) | |
def _parse_data(sentence_type, data): | |
"""Parse sentence data for the specified sentence type and | |
return a list of parameters in the correct format, or return None. | |
""" | |
# pylint: disable=too-many-branches | |
if not _ST_MIN <= sentence_type <= _ST_MAX: | |
# The sentence_type is unknown | |
return None | |
param_types = _SENTENCE_PARAMS[sentence_type] | |
if len(param_types) != len(data): | |
# The expected number does not match the number of data items | |
return None | |
params = [] | |
try: | |
for i, dti in enumerate(data): | |
pti = param_types[i] | |
len_dti = len(dti) | |
nothing = dti is None or len_dti == 0 | |
if pti == "c": | |
# A single character | |
if len_dti != 1: | |
return None | |
params.append(dti) | |
elif pti == "C": | |
# A single character or Nothing | |
if nothing: | |
params.append(None) | |
elif len_dti != 1: | |
return None | |
else: | |
params.append(dti) | |
elif pti == "d": | |
# A number parseable as degrees | |
params.append(_parse_degrees(dti)) | |
elif pti == "D": | |
# A number parseable as degrees or Nothing | |
if nothing: | |
params.append(None) | |
else: | |
params.append(_parse_degrees(dti)) | |
elif pti == "f": | |
# A floating point number | |
params.append(_parse_float(dti)) | |
elif pti == "i": | |
# An integer | |
params.append(_parse_int(dti)) | |
elif pti == "I": | |
# An integer or Nothing | |
if nothing: | |
params.append(None) | |
else: | |
params.append(_parse_int(dti)) | |
elif pti == "s": | |
# A string | |
params.append(dti) | |
elif pti == "S": | |
# A string or Nothing | |
if nothing: | |
params.append(None) | |
else: | |
params.append(dti) | |
else: | |
raise TypeError(f"GPS: Unexpected parameter type '{pti}'") | |
except ValueError: | |
# Something didn't parse, abort | |
return None | |
# Return the parsed data | |
return params | |
class GPS_asyncio(adafruit_gps.GPS): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
async def read(self, num_bytes): | |
"""Read up to num_bytes of data from the GPS directly, without parsing. | |
Returns a bytearray with up to num_bytes or None if nothing was read""" | |
return await self._uart.read(num_bytes) | |
async def readline(self): | |
"""Returns a newline terminated bytearray, must have timeout set for | |
the underlying UART or this will block forever!""" | |
return await self._uart.readline() | |
async def _read_sentence(self): | |
# Parse any NMEA sentence that is available. | |
# pylint: disable=len-as-condition | |
# This needs to be refactored when it can be tested. | |
sentence = await self.readline() | |
if sentence is None or sentence == b"" or len(sentence) < 1: | |
return None | |
try: | |
sentence = str(sentence, "ascii").strip() | |
except UnicodeError: | |
return None | |
# Look for a checksum and validate it if present. | |
if len(sentence) > 7 and sentence[-3] == "*": | |
# Get included checksum, then calculate it and compare. | |
expected = int(sentence[-2:], 16) | |
actual = 0 | |
for i in range(1, len(sentence) - 3): | |
actual ^= ord(sentence[i]) | |
if actual != expected: | |
return None # Failed to validate checksum. | |
# copy the raw sentence | |
self._raw_sentence = sentence | |
return sentence | |
# At this point we don't have a valid sentence | |
return None | |
async def _parse_sentence(self): | |
sentence = await self._read_sentence() | |
# sentence is a valid NMEA with a valid checksum | |
if sentence is None: | |
return None | |
# Remove checksum once validated. | |
sentence = sentence[:-3] | |
# Parse out the type of sentence (first string after $ up to comma) | |
# and then grab the rest as data within the sentence. | |
delimiter = sentence.find(",") | |
if delimiter == -1: | |
return None # Invalid sentence, no comma after data type. | |
data_type = sentence[1:delimiter] | |
return (data_type, sentence[delimiter + 1 :]) | |
async def update(self): | |
"""Check for updated data from the GPS module and process it | |
accordingly. Returns True if new data was processed, and False if | |
nothing new was received. | |
""" | |
# Grab a sentence and check its data type to call the appropriate | |
# parsing function. | |
try: | |
sentence = await self._parse_sentence() | |
except UnicodeError: | |
return None | |
if sentence is None: | |
return False | |
if self.debug: | |
print(sentence) | |
data_type, args = sentence | |
if len(data_type) < 5: | |
return False | |
data_type = bytes(data_type.upper(), "ascii") | |
(talker, sentence_type) = _parse_talker(data_type) | |
# Check for all currently known GNSS talkers | |
# GA - Galileo | |
# GB - BeiDou Systems | |
# GI - NavIC | |
# GL - GLONASS | |
# GP - GPS | |
# GQ - QZSS | |
# GN - GNSS / More than one of the above | |
if talker not in (b"GA", b"GB", b"GI", b"GL", b"GP", b"GQ", b"GN"): | |
# It's not a known GNSS source of data | |
# Assume it's a valid packet anyway | |
return True | |
result = True | |
args = args.split(",") | |
if sentence_type == b"GLL": # Geographic position - Latitude/Longitude | |
result = self._parse_gll(args) | |
elif sentence_type == b"RMC": # Minimum location info | |
result = self._parse_rmc(args) | |
elif sentence_type == b"GGA": # 3D location fix | |
result = self._parse_gga(args) | |
elif sentence_type == b"GSV": # Satellites in view | |
result = self._parse_gsv(talker, args) | |
elif sentence_type == b"GSA": # GPS DOP and active satellites | |
result = self._parse_gsa(talker, args) | |
return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import busio | |
class UART_asyncio(busio.UART): | |
def __init__(self, *args, **kwargs): | |
self._poll_wait = 0.005 | |
super().__init__(*args, **kwargs) | |
async def read(self, nbytes): | |
to_go = nbytes | |
buffer = bytearray(0) | |
while to_go != 0: | |
to_read = min(to_go, self.in_waiting) | |
if to_read != 0: | |
buffer.extend(super().read(to_read)) | |
to_go -= to_read | |
else: | |
await asyncio.sleep(self._poll_wait) | |
return bytes(buffer) | |
async def readline(self): | |
buffer = bytearray(0) | |
b = '' | |
while b != b'\n': | |
b = await self.read(1) | |
buffer.extend(b) | |
return bytes(buffer) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment