Skip to content

Instantly share code, notes, and snippets.

@compustar
Created November 18, 2024 12:36
Show Gist options
  • Save compustar/8954f68de27cd7914232f83a2a64d1cc to your computer and use it in GitHub Desktop.
Save compustar/8954f68de27cd7914232f83a2a64d1cc to your computer and use it in GitHub Desktop.
A simple worker pool implementation using Python's threading and queue modules. This allows parallel execution of tasks by distributing them across a specified number of worker threads.
import threading
from queue import Queue
from typing import Callable, List
import logging
logger = logging.getLogger(__name__)
class QueueWorkerPool:
def __init__(self, num_worker_threads: int, fn: Callable[[int, List[int], Callable[[str, dict], None]], None]) -> None:
"""
Initializes the worker pool with a specific number of worker threads and a function to execute.
:param num_worker_threads: Number of worker threads to use for processing tasks.
:param fn: The function to be executed by worker threads. This function should accept a candidateId,
a list of openingIds, and a callback function.
"""
self.job_queue = Queue()
self.num_worker_threads = num_worker_threads
self.fn = fn
self.workers = []
def _do_work(self):
"""
Worker function to process tasks from the queue.
"""
while True:
job = self.job_queue.get()
logger.debug(f'A job received: {job}')
if job is None: # Sentinel signal
self.job_queue.task_done()
break
try:
args, kwargs = job
logger.debug('Start processing the task...')
self.fn(*args, **kwargs)
except Exception:
logger.traceback()
finally:
self.job_queue.task_done()
def submit(self, *args, **kwargs):
logger.debug(f'no of jobs: {self.job_queue.qsize()}')
self.job_queue.put((args, kwargs))
logger.debug(f'no of jobs: {self.job_queue.qsize()}')
def start(self):
"""
Starts the worker threads that will process the jobs in the queue.
"""
for _ in range(self.num_worker_threads):
t = threading.Thread(target=self._do_work)
t.daemon = True # allows the thread to exit when the main program exits
t.start()
logger.debug(f'no of workers: {len(self.workers)}')
def stop(self):
"""
Stop the worker tasks by sending a sentinel signal to each worker.
"""
for _ in range(self.num_worker_threads):
self.job_queue.put(None)
self.job_queue.join()
# Example usage
if __name__ == "__main__":
# Example function to be executed by the worker threads
def example_analysis_function(param1, param2, filename, cb):
# Simulate processing time
import time
time.sleep(1)
print(f"Processing {param1} for {param2}: {filename}")
# Example callback invocation
cb("success", {"message": "Task completed"})
# Example callback function
def callback(status, data):
print(f"Callback received - Status: {status}, Data: {data}")
# Initialize the QueuedAnalyst with 4 worker threads
analyzer = QueueWorkerPool(num_worker_threads=4, fn=example_analysis_function)
# Start the worker threads
analyzer.start()
# Submit some jobs
analyzer.submit(1, [101, 102, 103], 'myfile.pdf', callback)
analyzer.submit(2, [201, 202, 203], 'myfile.pdf', callback)
# To ensure main thread waits for all jobs to be processed before exiting
analyzer.job_queue.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment