Skip to content

Instantly share code, notes, and snippets.

@MaxWinterstein
Created December 5, 2024 10:27
Show Gist options
  • Save MaxWinterstein/47616fb1c8d5ef4e7ad657e83dd589c6 to your computer and use it in GitHub Desktop.
Save MaxWinterstein/47616fb1c8d5ef4e7ad657e83dd589c6 to your computer and use it in GitHub Desktop.
Middleware to shutdown worker if idle for more than idle_timeout seconds
class AutoShutdownMiddleware(dramatiq.Middleware):
def __init__(self, idle_timeout=120):
"""Middleware to shutdown worker if idle for more than idle_timeout seconds.
:param idle_timeout: Time in seconds to wait before shutting down the worker.
"""
self.idle_timeout = idle_timeout
self.last_message_time = time.time()
self.monitor_thread = threading.Thread(target=self._monitor_idle)
self.monitor_thread.start()
def _monitor_idle(self):
"""Monitor idle time and terminate the worker process if it exceeds the idle timeout."""
while True:
time.sleep(1)
if time.time() - self.last_message_time > self.idle_timeout:
print(
f"Worker has been idle for {self.idle_timeout} seconds. Shutting down..."
)
# os._exit(0) # Hard exit
os.kill(os.getpid(), signal.SIGTERM) # better exit
break
def before_process_message(self, broker, message):
"""Hook called before processing a message. Resets the idle timer."""
self.last_message_time = time.time()
def after_process_message(self, broker, message, *, result=None, exception=None):
"""Hook called after processing a message. Resets the idle timer."""
self.last_message_time = time.time()
def after_declare_queue(self, broker, queue_name):
"""Hook called after declaring a queue. Resets the idle timer."""
self.last_message_time = time.time()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment