Skip to content

Instantly share code, notes, and snippets.

@alexkiro
Last active April 29, 2020 12:48
Show Gist options
  • Save alexkiro/0b20afcbee82948a110c5d556b057a44 to your computer and use it in GitHub Desktop.
Save alexkiro/0b20afcbee82948a110c5d556b057a44 to your computer and use it in GitHub Desktop.
Django celery deploy check.
from django.core.checks import register, Warning, Error
@register(deploy=True)
def check_celery_worker(**kwargs):
from celery import current_app
errors = []
try:
assert current_app.control.inspect().ping(), "could not ping"
except Exception as e:
errors.append(
Error(
f"Celery worker do not appear to be running: {e}",
hint=f"Ensure that the celery worker are running",
obj=None,
id="EXXX",
)
)
return errors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment