Last active
May 2, 2025 21:42
-
-
Save jaeoh2/536eac354665c2f5a1bbe803f8641ef7 to your computer and use it in GitHub Desktop.
NMEA GPRMC Python Parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import time | |
import serial | |
import threading | |
class GPSRMC(object): | |
def __init__(self, dev='/dev/ttyUSB0', timeout=0.5, interval=0.1): | |
self.dev = dev | |
self.timeout = timeout | |
self.interval = interval | |
self.ser = None | |
self.nmea_sentence = "" | |
self.utc = 0.0 | |
self.pos_status = "" | |
self.lat = 0.0 | |
self.lat_dir = "" | |
self.lon = 0.0 | |
self.lon_dir = "" | |
self.speed_Kn = 0.0 | |
self.track_true = 0.0 | |
self.date = 0 | |
self.mag_var = 0.0 | |
self.var_dir = "" | |
self.mode_ind = "" | |
self.chksum = 0 | |
self.cnt = 0 | |
self.lock = threading.Lock() | |
self.callback() | |
def callback(self): | |
try: | |
if self.ser is None: | |
print("Trying to open gps serial port:{}".format(self.dev)) | |
self.ser = serial.Serial(self.dev, timeout=self.timeout) | |
print("GPS port opened") | |
except OSError: | |
print ('*** GPS ERROR: OS error (possibly serial port is not available)') | |
time.sleep(3) | |
except serial.SerialException: | |
print ('*** GPS ERROR: serial port error (possibly device is reset, powered down or in sleep mode). Restarting reading process...') | |
self.ser = None | |
time.sleep(3) | |
self.cnt += 1 | |
self.thr = threading.Timer(self.interval, self.callback) | |
if self.cnt > 3: | |
print("*** GPS not found") | |
self.thr.cancel() | |
# sys.exit() | |
return False | |
self.thr.start() | |
with self.lock: | |
header = "$GPRMC," | |
# received_data = "$GPRMC,144326.00,A,5107.0017737,N,11402.3291611,W,0.080,323.3,210307,0.0,E,A*20" # test | |
try: | |
received_data = str(self.ser.readline()) | |
except serial.SerialException: | |
self.ser.close() | |
self.ser = None | |
self.callback() | |
if received_data.split(",")[0] == "b'$GPRMC": | |
self.nmea_sentence = received_data.split(header, 1)[1].split(",") | |
# print("GPRMC received", received_data) | |
if not self.nmea_sentence[0]: | |
# print("NMEA:{}".format(self.nmea_sentence)) | |
pass | |
self.parse() | |
def parse(self): | |
nmea = self.nmea_sentence | |
self.utc = nmea[0] | |
self.pos_status = nmea[1] | |
self.lat = self._convert_to_decimal_deg(nmea[2]) | |
self.lat_dir = nmea[3] | |
if self.lat_dir == "S": | |
self.lat = -self.lat | |
self.lon = self._convert_to_decimal_deg(nmea[4]) | |
self.lon_dir = nmea[5] | |
if self.lon_dir == "W": | |
self.lon = -self.lon | |
self.speed_Kn = 0.0 if nmea[6] == "" else float(nmea[6]) | |
self.track_true = nmea[7] | |
self.date = nmea[8] | |
self.mag_var = nmea[9] | |
self.var_dir = nmea[10] | |
self.mode_ind = nmea[11].split("*")[0] | |
self.chksum = '*' + nmea[11].split("*")[1] | |
@staticmethod | |
def _convert_to_decimal_deg(dms_value): | |
if dms_value == "": | |
return -1.0 | |
decimal_value = float(dms_value) / 100.0 | |
degrees = int(decimal_value) | |
mm_mmmm = (decimal_value - degrees) / 0.6 | |
dd = degrees + mm_mmmm | |
return dd | |
def close(self): | |
self.thr.cancel() | |
# self.ser.close() | |
def __del__(self): | |
print("closed") | |
self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment