-
-
Save Overdrivr/ae1df2e08335f990f2c4 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*- | |
from pyqtgraph.Qt import QtGui, QtCore | |
import numpy as np | |
import pyqtgraph as pg | |
from multiprocessing import Process, Manager, Queue | |
import sched, time, threading | |
# This function is responsible for displaying the data | |
# it is run in its own process to liberate main process | |
def display(name,q): | |
app2 = QtGui.QApplication([]) | |
win2 = pg.GraphicsWindow(title="Basic plotting examples") | |
win2.resize(1000,600) | |
win2.setWindowTitle('pyqtgraph example: Plotting') | |
p2 = win2.addPlot(title="Updating plot") | |
curve = p2.plot(pen='y') | |
x_np = [] | |
y_np = [] | |
def updateInProc(curve,q,x,y): | |
item = q.get() | |
x.append(item[0]) | |
y.append(item[1]) | |
curve.setData(x,y) | |
timer = QtCore.QTimer() | |
timer.timeout.connect(lambda: updateInProc(curve,q,x_np,y_np)) | |
timer.start(50) | |
QtGui.QApplication.instance().exec_() | |
# This is function is responsible for reading some data (IO, serial port, etc) | |
# and forwarding it to the display | |
# it is run in a thread | |
def io(running,q): | |
t = 0 | |
while running.is_set(): | |
s = np.sin(2 * np.pi * t) | |
t += 0.01 | |
q.put([t,s]) | |
time.sleep(0.01) | |
print("Done") | |
## Start Qt event loop unless running in interactive mode or using pyside. | |
if __name__ == '__main__': | |
manager = Manager() | |
q = Queue() | |
run = threading.Event() | |
run.set() | |
# Run io function in a thread | |
t = threading.Thread(target=io, args=(run,q)) | |
t.start() | |
# Start display process | |
p = Process(target=display, args=('bob',q)) | |
p.start() | |
input("Type any key to quit.") | |
run.clear() | |
print("Waiting for scheduler thread to join...") | |
t.join() | |
print("Waiting for graph window process to join...") | |
p.join() | |
print("Process joined successfully. C YA !") |
It did at a time, but I never guaranteed it still does :) Can you be more specific ? Crash, stacktrace, etc.
on macos Mojave, with python 3.7.1, I get the following error:
python37 test.py
See ? Main process immediately free ! Type any key to quit.objc[62313]: +[NSValue initialize] may have been in progress in another thread when fork() was called.
objc[62313]: +[NSValue initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
after a keypress:
Waiting for scheduler thread to join...
Done
Waiting for graph window process to join...
Process joined successfully. C YA !
finally, after ctrl C:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/util.py", line 265, in _run_finalizers
finalizer()
File "/usr/local/Cellar/python/3.7.1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/util.py", line 189, in call
res = self._callback(*self._args, **self._kwargs)
File "/usr/local/Cellar/python/3.7.1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 192, in _finalize_join
thread.join()
File "/usr/local/Cellar/python/3.7.1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/local/Cellar/python/3.7.1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
I'd really like to see this in action. Thanks
I don't have MacOS, can't really debug this on my side.
This is pretty old code and I would recommend doing it from scratch or debugging the hell out using import pdb; pdb.set_trace()
+ checking python threading docs
If your goal is to have an interactive plot, I would recommend doing the following instead:
- Put your data-generating logic in a class deriving QThread. Have the class register a custom signal such as
on_generated_data
- In your QThread.run method, generate your data and regularly emit the signal with new data
- From main process, instanciate the plot, instanciate the Thread, connect the custom signal
on_generated_data
to a method that will update your plot
That should be it. The main advantage of QThreads is that emitting signals is thread-safe, so it's a pretty easy and clean way to relay info from a thread to the main process.
THANK YOU! I've been struggling trying to figure out how to do something just like this.
This is a wonderful code that works! It helps me out. Many thanks!
I have adapted it to visualize a dynamically refreshed random matrix, as posted below.
Probably helpful for a beginner like myself.
# -*- coding: utf-8 -*-
from pyqtgraph.Qt import QtGui, QtCore
import numpy as np
import pyqtgraph as pg
from multiprocessing import Process, Manager, Queue
import sched, time, threading
import sys
# This function is responsible for displaying the data
# it is run in its own process to liberate main process
def display(q):
app = QtGui.QApplication(sys.argv)
app2 = QtGui.QMainWindow(None, QtCore.Qt.WindowStaysOnTopHint)
mainbox = QtGui.QWidget()
app2.setCentralWidget(mainbox)
mainbox.setLayout(QtGui.QVBoxLayout())
canvas = pg.GraphicsLayoutWidget()
mainbox.layout().addWidget(canvas)
view = canvas.addViewBox()
view.setAspectLocked(True)
view.setRange(QtCore.QRectF(0, 0, 100, 100))
img = pg.ImageItem(border='w')
view.addItem(img)
def updateInProc(img,q):
item = q.get()
img.setImage(item.T)
timer = QtCore.QTimer()
timer.timeout.connect(lambda: updateInProc(img,q))
timer.start(50)
app2.show()
app.exec_()
# This is function is responsible for reading some data (IO, serial port, etc)
# and forwarding it to the display
# it is run in a thread
def io(running,q):
t = 0
while running.is_set():
data = np.random.random((100,100))
q.put(data)
time.sleep(0.01)
print("Done")
if __name__ == '__main__':
q = Queue()
# Event for stopping the IO thread
run = threading.Event()
run.set()
# Run io function in a thread
t = threading.Thread(target=io, args=(run,q))
t.start()
# Start display process
p = Process(target=display, args=(q))
p.start()
input("See ? Main process immediately free ! Type any key to quit.")
run.clear()
print("Waiting for scheduler thread to join...")
t.join()
print("Waiting for graph window process to join...")
p.join()
print("Process joined successfully. C YA !")
Hello @Overdrivr , I ran your code on Windows10 Pycharm and got the following error:
Traceback (most recent call last):
File "F:\Anaconda\envs\empty\lib\site-packages\IPython\core\interactiveshell.py", line 3441, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-2-333432fa0c82>", line 1, in <module>
runfile('G:/0_mycode/empty_pc/git/Empty/core/test.py', wdir='G:/0_mycode/empty_pc/git/Empty/core')
File "F:\PyCharm 2021.2\plugins\python\helpers\pydev\_pydev_bundle\pydev_umd.py", line 198, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "F:\PyCharm 2021.2\plugins\python\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "G:/0_mycode/empty_pc/git/Empty/core/test.py", line 57, in <module>
p.start()
File "F:\Anaconda\envs\empty\lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
File "F:\Anaconda\envs\empty\lib\multiprocessing\context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "F:\Anaconda\envs\empty\lib\multiprocessing\context.py", line 327, in _Popen
return Popen(process_obj)
File "F:\Anaconda\envs\empty\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
reduction.dump(process_obj, to_child)
File "F:\Anaconda\envs\empty\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function display at 0x0000027AB237DF70>: attribute lookup display on __main__ failed
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "F:\Anaconda\envs\empty\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "F:\Anaconda\envs\empty\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
Could you help me please?
This works.!!! But I want to embed pyqtgraph in application and turn on and off the plotting. I wonder how will I be able to do this. Do you have any code to resemble the idea?
I was also having some issues getting this example to work, but managed to in the end by changing GraphicsWindow
to GraphicsLayoutWidget
, and app2 = QtGui.QApplication([])
to app2 = pg.mkQApp("Multiprocess plotter")
.
Nevertheless, when I tried adding a second process that was making data for a second curve, using the Queue
got quite laggy.
Using an example I found that made a numpy array with shared memory, I was able to make a plotter which plots two curves, but the data it plots gets updated by two external processes.
Hope someone finds this useful.
import time
from multiprocessing import Process
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import SharedMemory
from typing import Tuple
import numpy as np
from pyqtgraph.Qt import QtCore
import pyqtgraph as pg
def create_np_array_from_shared_mem(
shared_mem: SharedMemory,
shared_data_dtype: np.dtype,
shared_data_shape: Tuple[int, ...],
) -> np.ndarray:
arr = np.frombuffer(shared_mem.buf, dtype=shared_data_dtype)
arr = arr.reshape(shared_data_shape)
return arr
# This function is responsible for displaying the data
# it is run in its own process to liberate main process
def display(
shared_mem: SharedMemory,
shared_data_dtype: np.dtype,
shared_data_shape: Tuple[int, ...],
):
app = pg.mkQApp("Multiprocess plotter")
arr = create_np_array_from_shared_mem(
shared_mem, shared_data_dtype, shared_data_shape
)
win2 = pg.GraphicsLayoutWidget(title="Basic plotting examples")
win2.resize(1000, 600)
win2.setWindowTitle('pyqtgraph example: Plotting')
p2 = win2.addPlot(title="Updating plot")
curve1 = p2.plot(pen='y')
curve2 = p2.plot(pen='b')
def updateInProc():
curve1.setData(arr[:, 0, 0], arr[:, 1, 0])
curve2.setData(arr[:, 0, 1], arr[:, 1, 1])
win2.show()
timer = QtCore.QTimer()
timer.timeout.connect(updateInProc)
timer.start(50)
pg.exec()
def make_data1(
shared_mem: SharedMemory,
shared_data_dtype: np.dtype,
shared_data_shape: Tuple[int, ...],
):
arr = create_np_array_from_shared_mem(
shared_mem, shared_data_dtype, shared_data_shape
)
for i in range(1000):
t = i / 100
s = np.sin(2 * np.pi * t)
arr[i, 0, 0] = t
arr[i, 1, 0] = s
time.sleep(0.01)
print("Done")
def make_data2(
shared_mem: SharedMemory,
shared_data_dtype: np.dtype,
shared_data_shape: Tuple[int, ...],
):
arr = create_np_array_from_shared_mem(
shared_mem, shared_data_dtype, shared_data_shape
)
for i in range(1000):
t = i / 100
s = np.sin(2 * np.pi * t + np.pi)
arr[i, 0, 1] = t
arr[i, 1, 1] = s
time.sleep(0.01)
print("Done")
if __name__ == '__main__':
data_to_share = np.zeros((1000, 2, 2))
SHARED_DATA_DTYPE = data_to_share.dtype
SHARED_DATA_SHAPE = data_to_share.shape
SHARED_DATA_NBYTES = data_to_share.nbytes
with SharedMemoryManager() as smm:
shared_mem = smm.SharedMemory(size=SHARED_DATA_NBYTES)
writer1 = Process(
target=make_data1, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)
)
writer2 = Process(
target=make_data2, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)
)
reader = Process(
target=display, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)
)
writer1.start()
writer2.start()
reader.start()
writer1.join()
writer2.join()
reader.join()
This doesn-t work..