Created
April 17, 2012 06:26
-
-
Save wmorgan/2403893 to your computer and use it in GitHub Desktop.
safe scheduler in redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## THIS VERSION HAS SOME BUGS | |
## SEE http://masanjin.net/blog/using-redis-for-scheduling for the fixes | |
## | |
## call schedule! to schedule an item at a specific time. | |
## call blocking_each to have each item yielded to you, on or after its scheduled time. | |
## items will be placed in ERROR_QUEUE during the course of processing. clean these up at will. | |
class Scheduler | |
QUEUE = "q" | |
ERROR_QUEUE = "eq" | |
RETRY_DELAY = 1 # seconds | |
CAS_DELAY = 0.5 # seconds | |
def initialize redis | |
@redis = redis | |
end | |
def schedule! thing, time | |
@redis.zadd QUEUE, time.to_f, thing | |
end | |
def blocking_each | |
while true | |
thing = blocking_get | |
begin | |
yield thing | |
ensure | |
cleanup thing | |
end | |
end | |
end | |
private | |
def blocking_get | |
while true | |
@redis.watch QUEUE | |
thing = @redis.zrange(QUEUE, 0, 0).first | |
at = @redis.zscore(QUEUE, thing).to_f if thing | |
if thing && (Time.at(at) < Time.now) | |
success = @redis.multi do | |
@redis.zrem QUEUE, thing | |
@redis.lpush ERROR_QUEUE, thing | |
end | |
if success | |
break thing | |
else | |
sleep CAS_DELAY | |
end | |
else | |
@redis.unwatch | |
sleep RETRY_DELAY | |
end | |
end | |
end | |
def cleanup thing | |
@redis.lrem ERROR_QUEUE, 1, thing | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment