Created
March 28, 2020 16:13
-
-
Save mguijarr/aaed379574c0fc51639a19d8c4c5ab39 to your computer and use it in GitHub Desktop.
Dummy Tango device + gevent thread + writer thread example using TgGevent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tango.server import run | |
from tango.server import Device | |
from tango.server import attribute, command | |
import threading # assuming threading is **not** monkey-patched | |
from gevent._threading import Queue # the genuine, unpatched Queue class (thread-safe) | |
import gevent | |
import gevent.event | |
import TgGevent | |
class WriterThread(threading.Thread): | |
def __init__(self, job_queue): | |
super().__init__() | |
self.job_queue = job_queue | |
def run(self): | |
print("hello, I am the writing thread", threading.get_ident()) | |
while True: | |
job_desc, data = self.job_queue.get() | |
if job_desc == "CREATE": | |
print("writing: should create new file", data) | |
elif job_desc == "WRITE": | |
print("writing: should write", data) | |
elif job_desc == "CLOSE": | |
print("writing: should close") | |
break | |
class SessionListener: | |
def __init__(self, session_name): | |
self.writer_tasks = Queue() #this is the thread-safe queue that will be shared btw listening thread and writing thread | |
self.writer_thread = WriterThread(self.writer_tasks) #create the writing thread | |
self.task = gevent.spawn(self.listen, session_name) #start scan listening greenlet | |
self._listening_event = gevent.event.Event() | |
def wait_listening_started(self): | |
return self._listening_event.wait() | |
def listen(self, session_name): | |
self.writer_thread.start() | |
print("Ok, listening to", session_name, "in thread id=", threading.get_ident()) | |
self._listening_event.set() # synchronise with caller | |
i = 0 | |
self.writer_tasks.put(("CREATE","/data/blablabla/file.h5")) | |
while True: | |
gevent.sleep(0.5) #simulate communication with redis via BLISS API | |
i+=1 | |
self.writer_tasks.put(("WRITE", i)) | |
if i == 10: | |
# let's say scan is done | |
self.writer_tasks.put(("CLOSE", None)) | |
self.writer_thread.join() | |
break | |
def scan_done(self): | |
return self.task.ready() | |
class NexusWriterTangoDev(Device): | |
def __init__(self, *args, **kwargs): | |
Device.__init__(self, *args, **kwargs) | |
self.session_listener = None | |
@command(dtype_in=str) | |
def start_listening(self, session_name): | |
# make an instance of SessionListener in another thread | |
self.session_listener = TgGevent.get_proxy(SessionListener, session_name) | |
self.session_listener.wait_listening_started() | |
@command(dtype_out=bool) | |
def check_is_running(self): | |
return self.session_listener and not self.session_listener.scan_done() | |
if __name__ == "__main__": | |
run((NexusWriterTangoDev,)) | |
# corresponding YML config: | |
#- device: | |
# - class: NexusWriterTangoDev | |
# tango_name: id00/tango/nx_writer | |
# personal_name: test | |
# server: nexus_writer_test_tango_dev | |
# | |
# Start with: | |
# python nexus_writer_test_tango_dev.py test |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment