Skip to content

Instantly share code, notes, and snippets.

@bergundy
Created February 8, 2012 15:33

Revisions

  1. bergundy revised this gist Feb 8, 2012. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -66,7 +66,8 @@ def _schedule_next(self):

    @property
    def _next_timeout(self):
    return self._trigger.get_next_fire_time(datetime.now()) - datetime.now()
    d = datetime.now()
    return self._trigger.get_next_fire_time(d) - d

    if __name__ == "__main__":
    import doctest
  2. bergundy created this gist Feb 8, 2012.
    73 changes: 73 additions & 0 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    import re
    import itertools
    import logging

    from apscheduler.triggers.cron import CronTrigger
    from tornado.ioloop import IOLoop
    from datetime import datetime

    class CronCallback(object):
    """Schedules the given callback to be called periodically.
    The callback is called according to the schedule argument.
    `start` must be called after the CronCallback is created.
    If schedule is a string it should contain 7 cron fields: ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week').
    If schedule is a dict it must contain at least one of the fields above.
    >>> cron1 = CronCallback(lambda: logging.error('x'), dict(seconds = 1)) # logs 'x' every second
    >>> cron2 = CronCallback(lambda: IOLoop.instance().stop(), '*/5 * * * * * *') # stops ioloop every 5 seconds
    >>> cron1.start()
    >>> cron2.start()
    >>> IOLoop.instance().start()
    """

    _split_re = re.compile("\s+")
    _sched_seq = ('second', 'minute', 'hour', 'day', 'month', 'year', 'day_of_week')

    def __init__(self, callback, schedule, io_loop=None):
    if isinstance(schedule, basestring):
    splitted = self._split_re.split(schedule)
    if len(splitted) < 7:
    raise TypeError("'schedule' argument pattern mismatch")

    schedule = dict(itertools.izip(self._sched_seq, splitted))

    self.callback = callback
    self._trigger = CronTrigger(**schedule)
    self.io_loop = io_loop or IOLoop.instance()
    self._running = False
    self._timeout = None

    def start(self):
    """Starts the timer."""
    self._running = True
    self._schedule_next()

    def stop(self):
    """Stops the timer."""
    self._running = False
    if self._timeout is not None:
    self.io_loop.remove_timeout(self._timeout)
    self._timeout = None

    def _run(self):
    if not self._running: return
    try:
    self.callback()
    except Exception:
    logging.error("Error in cron callback", exc_info=True)
    self._schedule_next()

    def _schedule_next(self):
    if self._running:
    self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)

    @property
    def _next_timeout(self):
    return self._trigger.get_next_fire_time(datetime.now()) - datetime.now()

    if __name__ == "__main__":
    import doctest
    doctest.testmod()