Skip to content

Instantly share code, notes, and snippets.

View MaxWinterstein's full-sized avatar

Max Winterstein MaxWinterstein

View GitHub Profile
@MaxWinterstein
MaxWinterstein / AutoShutdownMiddleware.py
Created December 5, 2024 10:27
Middleware to shutdown worker if idle for more than idle_timeout seconds
class AutoShutdownMiddleware(dramatiq.Middleware):
def __init__(self, idle_timeout=120):
"""Middleware to shutdown worker if idle for more than idle_timeout seconds.
:param idle_timeout: Time in seconds to wait before shutting down the worker.
"""
self.idle_timeout = idle_timeout
self.last_message_time = time.time()
self.monitor_thread = threading.Thread(target=self._monitor_idle)
self.monitor_thread.start()