Skip to content

Instantly share code, notes, and snippets.

@pklaus
Last active July 14, 2017 17:15
Show Gist options
  • Save pklaus/4039175 to your computer and use it in GitHub Desktop.
Save pklaus/4039175 to your computer and use it in GitHub Desktop.
This module has now evolved into a proper Python package, that you can install with pip: https://github.com/pklaus/serialman
#!/usr/bin/env python
import serial
import threading
import time
import argparse
from multiprocessing import Process, Queue
try:
from queue import Empty
except:
from Queue import Empty
class SerialManager(Process):
""" This class has been written by
Philipp Klaus and can be found on
https://gist.github.com/4039175 . """
def __init__(self, device, **kwargs):
settings = dict()
settings['baudrate'] = 9600
settings['bytesize'] = serial.EIGHTBITS
settings['parity'] = serial.PARITY_NONE
settings['stopbits'] = serial.STOPBITS_ONE
settings['timeout'] = 0
settings.update(kwargs)
self._kwargs = settings
self.ser = serial.Serial(device, **self._kwargs)
self.in_queue = Queue()
self.out_queue = Queue()
self.closing = False # A flag to indicate thread shutdown
self.sleeptime = 0.0005
Process.__init__(self, target=self.loop)
def loop(self):
try:
while not self.closing:
time.sleep(self.sleeptime)
in_data = self.ser.read(256)
if in_data: self.in_queue.put(in_data)
try:
out_buffer = self.out_queue.get_nowait()
self.ser.write(out_buffer)
except Empty:
pass
except (KeyboardInterrupt, SystemExit):
pass
self.ser.close()
def close(self):
self.closing = True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='A class to manage reading and writing from and to a serial port.')
parser.add_argument('--baudrate', '-b', type=int, default=9600, help='Baudrate of serial port.')
parser.add_argument('device', help='The serial port to use (COM4, /dev/ttyUSB1 or similar).')
args = parser.parse_args()
s1 = SerialManager(args.device, baudrate=args.baudrate)
s1.start()
try:
while True:
data = s1.in_queue.get()
print(repr(data))
except KeyboardInterrupt:
s1.close()
finally:
s1.close()
s1.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment