-
-
Save renierdbruyn/0190908ee2b97e5c012ad383d9a7288a to your computer and use it in GitHub Desktop.
Dynamically add celery tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## THIS IS UNTESTED | |
from worker.models import TaskType | |
from website.celery import app | |
import importlib | |
# Dynamically add registered tasks | |
# Celery._tasks is a task registry object | |
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L162 - _tasks | |
# https://github.com/celery/celery/blob/524421a36dcc838a5f51e5cf122902aab774bad1/celery/app/registry.py#L22 - task registry object | |
# Autodiscover calls AppLoader which is a thin shell around BaseLoader | |
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L341 - autodisover | |
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/loaders/base.py - base loader | |
# task function calls _task_from_fun, which adds tasks to registry | |
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L227 - task; creates a new task | |
# https://github.com/celery/celery/blob/34c43244b1681a59540936748800aaa504786a35/celery/app/base.py#L269 - _task_from_fun | |
# task function is called as a decorator on a task function | |
# TL;DR | |
# - call app.task directly on functions you want to add | |
def add_tasks(): | |
"""Grab task definitions out of database and add them to celery's registry""" | |
for task in TaskType.objects.all(): | |
# Grab the reference to the function | |
# get function from call_string | |
# call string should be in the form: "module1.module2.module3.task_class" | |
module_path = task.call_string.split(".") | |
module = importlib.import_module(".%s" %(module_path[-2]), | |
package=".".join(module_path[0:-2])) | |
task_class = getattr(module, module_path[-1]) | |
# The execute method is the callable | |
# Call the 'task' method to register | |
app.task(getattr(task_class, 'execute')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment