Skip to content

Instantly share code, notes, and snippets.

@alkasm
Created February 6, 2021 10:37
Show Gist options
  • Save alkasm/4aa357f40501eaf75fa1ed87b252a47e to your computer and use it in GitHub Desktop.
Save alkasm/4aa357f40501eaf75fa1ed87b252a47e to your computer and use it in GitHub Desktop.
Cooperative, event-driven, producer/consumer patterns
from collections import deque
import logging
import signal
from threading import Condition, Event, Thread
import time
logging.basicConfig(level=logging.INFO)
class Stop(Exception):
pass
class Q:
def __init__(self):
self.q = deque()
self.cv = Condition()
def put(self, val):
"""Puts wake up threads that are blocked."""
with self.cv:
self.q.append(val)
self.cv.notify()
def get(self):
"""If woken up, check if the global stop event has been set and throw, or retrieve an item."""
with self.cv:
self.cv.wait_for(lambda: stop.is_set() or len(self.q) > 0)
if stop.is_set():
raise Stop
return self.q.popleft()
def stop(self):
"""The stop method can be called by the main thread to break the condition that other threads are blocking on."""
with self.cv:
self.cv.notify_all()
stop = Event()
q = Q()
def producer():
for i in range(10):
time.sleep(1)
if stop.is_set():
logging.info("producer exited due to stop flag")
break
logging.info("producer put %d", i)
q.put(i)
def consumer():
try:
while True:
item = q.get()
logging.info("consumer got %r", item)
except Stop as e:
logging.info("consumer exited due to exception %r", e)
ct = Thread(target=consumer)
pt = Thread(target=producer)
ct.start()
pt.start()
def handler(signum, frame):
logging.error("signal handler called with signal %d, stopping program", signum)
stop.set()
q.stop()
signal.signal(signal.SIGINT, handler)
ct.join()
pt.join()
@alkasm
Copy link
Author

alkasm commented Feb 6, 2021

A condition variable is able to work across a group of threads. That means the main thread can always notify all blocked threads with the right design. In other words, threads can utilize blocking/event-driven code via CVs to wait for data or events to process, and the main thread can signal for those threads to stop blocking by the same notification mechanism.

The pattern showcased here is that the data structure supporting blocking can raise an exception to stop when woken up by the main thread. Exceptions aren't required, but just allow for the locking and signaling mechanisms to be nicely encapsulated.

The pattern that this would otherwise replace is a constant loop-and-check mechanism both for the stop event and for new data, optionally sleeping to waste fewer cycles. An example consumer in that regime might look like:

q = deque()  # some other thread appends to this

def consumer():
    while not stop.is_set():
        while q:
            process(q.popleft())
        time.sleep(0.1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment