Last active
February 10, 2017 23:04
-
-
Save bowbahdoe/197615ae9c46d674c62b8c6d0f3a5c85 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Live plots data recieved over serial | |
''' | |
import collections | |
import matplotlib as mpl | |
import matplotlib.pyplot as plt | |
import serial | |
import serial.tools.list_ports as list_ports | |
import threading | |
import atexit | |
import time | |
class LivePlot: | |
''' | |
Thin wrapper over the default plot to provide an interface for live plotting | |
''' | |
def __init__(self, data_source, | |
*, title = None, | |
xlabel = None, | |
ylabel = None, | |
max_points = 1000): | |
self._data_source = data_source | |
self._max_points = max_points | |
self.data = collections.deque([None] * max_points, maxlen=max_points) | |
################################################################### | |
# We make the implicit assumption that the data will be displayed # | |
# in a 1x1 ratio. # | |
################################################################### | |
self._figure = plt.figure() | |
self._axis = self._figure.add_subplot(1,1,1) | |
if title: self._axis.set_title(title) | |
if xlabel: self._axis.set_xlabel(xlabel) | |
if ylabel: self._axis.set_ylabel(ylabel) | |
def add_data(self, x, y): | |
''' | |
adds the arbitrary x and y data points to the data set used by the plot | |
If adding the data would have the plot exceed max_points, the least | |
recently added data point is removed | |
''' | |
self.data.append(x) | |
def _add_data_thread(self, shutdown_event): | |
t = threading.currentThread() | |
try: | |
for new_data in self._data_source: | |
if shutdown_event.is_set(): | |
return | |
self.add_data(*new_data) | |
time.sleep(0.001) | |
except (KeyboardInterrupt, SystemExit): | |
return | |
def plot_forever(self): | |
''' | |
Continuously plots data from the data source | |
''' | |
################################################################# | |
# Since the target model for a live plot is a continuous # | |
# data source, we start a new thread to do that data collection # | |
################################################################# | |
shutdown_event = threading.Event() | |
data_col_thread = threading.Thread(target=self._add_data_thread, args = (shutdown_event,)) | |
data_col_thread.start() | |
def _kill_thread(): | |
''' | |
kills the data collection thread | |
''' | |
data_col_thread.do_run = False | |
data_col_thread.join() | |
atexit.register(_kill_thread) | |
plt.ion() | |
line, = self._axis.plot(range(len(self.data)),list(self.data), 'r-') | |
while True: | |
try: | |
line.set_ydata([a for a in self.data]) | |
self._axis.clear() | |
self._axis.plot(range(len(self.data)),list(self.data), 'r-') | |
plt.pause(0.00001) | |
except (KeyboardInterrupt, SystemExit, Exception) as e: | |
shutdown_event.set() | |
raise e | |
import random | |
def a(): | |
while True: | |
yield (random.random(), random.random()) | |
v = LivePlot(a(), max_points = 100, title = "TestPlot", xlabel="a") | |
v.plot_forever() | |
def establish_serial(baud_rate = None, serial_path = None): | |
while not baud_rate: | |
try: | |
baud_rate = input("What is the baud rate: ") | |
baud_rate = int(baud_rate) | |
except (EOFError, ValueError): | |
baud_rate = None | |
print("Entered baud rate was not a number, please try again") | |
ports = list_ports.comports() | |
for p in ports: | |
print(p) | |
# TODO handle input from user to determine serial path | |
serial_path = '/dev/something' | |
return BeeConnection(serial_path, baud_rate) | |
class BeeConnection: | |
''' | |
Iterator that represents a view of the data being sent over serial | |
by the exBee | |
''' | |
def __init__(self, serial_path, baud_rate, timeout = 1): | |
''' | |
initializes serial connection. If initial connect fails, waits half | |
a second and tries again. If the connection still is not established | |
after 10 such additional attempts, raises ConnectionError | |
''' | |
self._connection = serial.Serial(serial_path, baud_rate, timeout) | |
attempts = 0 | |
while not self.connection.isOpen(): | |
if attempts == 10: | |
raise ConnectionError("Failed to connect to serial device") | |
attempts += 1 | |
time.sleep(0.5) | |
self.raw_data = collections.deque() | |
def close(self): | |
''' | |
cleans up and closes the serial connection | |
''' | |
if self._connection.isOpen(): | |
self._connection.flush() | |
self._connection.close() | |
if not self._connection.isOpen(): | |
print("Serial port successfully closed") | |
else: | |
print("Something went wrong closing the connection") | |
def __exit__(self): | |
''' | |
closes connection when the object is used in a with block | |
''' | |
self.close() | |
def __del__(self): | |
''' | |
closes connection if the object is deleted | |
''' | |
self.close() | |
def __next__(self): | |
DELIMITER = b"^" | |
ENDBYTE = b';' | |
STARTBYTE = b"/" | |
try: | |
if self._connection.inWaiting(): | |
self.raw_data.append(self._connection.read()) | |
end = self.raw_data.pop() | |
self.raw_data.append(end) | |
if end == ENDBYTE: | |
data_line = b''.join(self.raw_data) | |
if data_line: | |
return 42 | |
except: | |
raise StopIteration | |
def __iter__(self): | |
return self |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
matplotlib | |
pyserial |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment