Created
August 22, 2018 09:53
-
-
Save amitripshtos/d4840406f7d6374ef8abcc2dca937583 to your computer and use it in GitHub Desktop.
Alternative to celery beat for people who got burned hard
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apscheduler.schedulers.background import BackgroundScheduler | |
from apscheduler.executors.pool import ThreadPoolExecutor | |
from apscheduler.jobstores.memory import MemoryJobStore | |
from apscheduler.job import Job | |
import json | |
import logging | |
from apscheduler.triggers.cron import CronTrigger | |
import time | |
from celery import Celery | |
from typing import List | |
# You must install APScheduler and celery in order to use that code | |
# In addition, you must override get_tasks, get_celery_application, update_periodic_tasks_from_database methods | |
logger = logging.getLogger(__name__) | |
class PeriodicTask: | |
def __init__(self, name: str, task: str, args: list, kwargs: dict, cron: CronTrigger) -> None: | |
self.name = name | |
self.task = task | |
self.args = args | |
self.kwargs = kwargs | |
self.cron = cron | |
def get_tasks() -> List[PeriodicTask]: | |
""" | |
Adjust this method to get list of periodic tasks from a file/database/whatever | |
:return: | |
""" | |
return [ | |
PeriodicTask(name='test task', task='tasks.test_task', args=[], kwargs={}, cron=CronTrigger(minute=1)) | |
] | |
def get_celery_application() -> Celery: | |
""" | |
Adjust this method to get the Celery object in your project. | |
:return: | |
""" | |
pass | |
def update_periodic_tasks_from_database(scheduler: BackgroundScheduler) -> None: | |
""" | |
Adjust this method to update changed tasks from file/database/whatever using scheduler.get_job to get the job, and then you can update it as you want. | |
:param scheduler: | |
:return: | |
""" | |
pass | |
class CeleryPoolExecutor(ThreadPoolExecutor): | |
""" | |
A threaded pool executor that will dispatch celery tasks instead running the tasks itself. | |
""" | |
def _do_submit_job(self, job: Job, run_times: int) -> None: | |
try: | |
logger.info('About to start task {} (Known next run time: {})'.format(job.name, job.next_run_time)) | |
try: | |
get_celery_application().send_task('api.tasks.{task_name}'.format(task_name=job.name), args=job.args, kwargs=job.kwargs) | |
except Exception as e: | |
logger.warning('Could not send task through celery. Exception: {}'.format(repr(e))) | |
self._run_job_success(job.id, []) | |
except Exception as e: | |
logger.error('Failed to start task {} by scheduler. Exception was: {}'.format(job.name, repr(e))) | |
self._run_job_error(job.id, e, []) | |
def dummy_func(*args, **kwargs) -> None: | |
""" | |
A dummy task required for APScheduler as a mandatory field. | |
We are using APScheduler to only schedule celery tasks, and not as a job runner, | |
therefore we will provide this dummy function instead a real one. | |
:param args: | |
:param kwargs: | |
:return: | |
""" | |
pass | |
def run(): | |
scheduler = BackgroundScheduler() | |
scheduler.add_jobstore(MemoryJobStore(), "default") | |
scheduler.add_executor(CeleryPoolExecutor(), 'default') | |
scheduler.remove_all_jobs() | |
# Populate scheduler tasks by the database | |
for periodic_task in get_tasks(): | |
scheduler.add_job( | |
func=dummy_func, | |
trigger=CronTrigger(**json.loads(periodic_task.cron)), | |
args=json.loads(periodic_task.args) if periodic_task.args else None, | |
kwargs=json.loads(periodic_task.kwargs) if periodic_task.kwargs else None, | |
name=periodic_task.task, | |
coalesce=True, | |
id=periodic_task.name, | |
) | |
logger.info('Scheduler started') | |
try: | |
scheduler.start() | |
while True: | |
update_periodic_tasks_from_database(scheduler) | |
time.sleep(5) | |
except Exception as e: | |
logger.info('Shutting down scheduler') | |
scheduler.shutdown() | |
if __name__ == "__main__": | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment