Skip to content

Instantly share code, notes, and snippets.

@pietrocolombo
Created August 29, 2024 05:12
Show Gist options
  • Save pietrocolombo/dfff0040c9952c232e076980a0c11718 to your computer and use it in GitHub Desktop.
Save pietrocolombo/dfff0040c9952c232e076980a0c11718 to your computer and use it in GitHub Desktop.
import threading
import queue
import time
import random
# Producer class
class Producer(threading.Thread):
def __init__(self, buffer):
super().__init__()
self.buffer = buffer
self._stop_event = threading.Event() # Event to stop the thread
def run(self):
try:
while not self._stop_event.is_set():
item = random.randint(1, 100) # Produce a random item
self.buffer.put(item) # Add item to the queue (buffer)
print(f"Produced: {item}")
time.sleep(random.uniform(0.1, 1)) # Simulate time taken to produce
except KeyboardInterrupt:
print("Producer interrupted and stopping...")
except Exception as e:
print(f"Producer encountered an error: {e}")
def stop(self):
self._stop_event.set()
# Consumer class
class Consumer(threading.Thread):
def __init__(self, buffer):
super().__init__()
self.buffer = buffer
self._stop_event = threading.Event() # Event to stop the thread
def run(self):
try:
while not self._stop_event.is_set():
item = self.buffer.get() # Remove item from the queue (buffer)
print(f"Consumed: {item}")
time.sleep(random.uniform(0.1, 1)) # Simulate time taken to consume
except KeyboardInterrupt:
print("Consumer interrupted and stopping...")
except Exception as e:
print(f"Consumer encountered an error: {e}")
def stop(self):
self._stop_event.set()
# Main function to set up and run the producer and consumer
def main():
buffer_size = 5
buffer = queue.Queue(maxsize=buffer_size) # Create a shared queue (buffer)
# Create producer and consumer threads
producer = Producer(buffer)
consumer = Consumer(buffer)
try:
# Start the threads
producer.start()
consumer.start()
# Let the threads run for a while
time.sleep(10)
except KeyboardInterrupt:
print("Main thread interrupted, stopping threads...")
finally:
producer.stop()
consumer.stop()
producer.join()
consumer.join()
print("Threads stopped gracefully.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment