Last active
July 14, 2017 17:15
-
-
Save pklaus/4039175 to your computer and use it in GitHub Desktop.
This module has now evolved into a proper Python package, that you can install with pip: https://github.com/pklaus/serialman
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import serial | |
| import threading | |
| import time | |
| class SerialReceiver(threading.Thread): | |
| """ This class has been written by | |
| Philipp Klaus and can be found on | |
| https://gist.github.com/4039175 . """ | |
| def __init__(self, device, *args): | |
| self._target = self.read | |
| self._args = args | |
| self.__lock = threading.Lock() | |
| self.ser = serial.Serial(device) | |
| self.data_buffer = "" | |
| self.closing = False # A flag to indicate thread shutdown | |
| self.sleeptime = 0.00005 | |
| threading.Thread.__init__(self) | |
| def run(self): | |
| self._target(*self._args) | |
| def read(self): | |
| while not self.closing: | |
| time.sleep(self.sleeptime) | |
| if not self.__lock.acquire(False): | |
| continue | |
| try: | |
| self.data_buffer += self.ser.read(6) | |
| finally: | |
| self.__lock.release() | |
| self.ser.close() | |
| def pop_buffer(self): | |
| # If a request is pending, we don't access the buffer | |
| if not self.__lock.acquire(False): | |
| return "" | |
| buf = self.data_buffer | |
| self.data_buffer = "" | |
| self.__lock.release() | |
| return buf | |
| def write(data): | |
| self.ser.write(data) | |
| def close(self): | |
| self.closing = True | |
| if __name__ == "__main__": | |
| device = '/dev/tty.usbserial' | |
| s1 = SerialReceiver(device) | |
| s1.start() | |
| try: | |
| while True: | |
| data = s1.pop_buffer() | |
| if data != "": print repr(data) | |
| except KeyboardInterrupt: | |
| s1.close() | |
| finally: | |
| s1.close() | |
| s1.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment