Skip to content

Instantly share code, notes, and snippets.

@rianhunter
Created September 24, 2015 04:17
Show Gist options
  • Save rianhunter/1adac6b7b1aa1aa0b463 to your computer and use it in GitHub Desktop.
Save rianhunter/1adac6b7b1aa1aa0b463 to your computer and use it in GitHub Desktop.
Using pyudev asynchronous monitoring with urwid
import queue
import os
import pyudev
import urwid
text = urwid.Text("")
loop = urwid.MainLoop(urwid.Filler(text))
inter_thread_queue = queue.Queue()
def handle_udev_event(data):
for _ in data:
(action, device) = inter_thread_queue.get(block=False)
text.set_text(text.get_text()[0] + "\n" +
'{0} - {1}'.format(action, device.get('ID_FS_LABEL')))
wakeup_main_fd = loop.watch_pipe(handle_udev_event)
def send_event_to_main_thread(action, device):
inter_thread_queue.put((action, device))
os.write(wakeup_main_fd, b'a')
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
observer = pyudev.MonitorObserver(monitor, send_event_to_main_thread)
observer.start()
loop.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment