Skip to content

Instantly share code, notes, and snippets.

@oeo
Created January 18, 2025 01:33
Show Gist options
  • Save oeo/ebe6a76145e777fb5a66f3cc9c4b9fbe to your computer and use it in GitHub Desktop.
Save oeo/ebe6a76145e777fb5a66f3cc9c4b9fbe to your computer and use it in GitHub Desktop.
require 'redis'
require 'securerandom'
require 'logger'
class EveryTime
LUA_ACQUIRE_LOCK = <<~LUA
local key = KEYS[1]
local now = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local max_count = tonumber(ARGV[3])
local execution_id = ARGV[4]
redis.call('zremrangebyscore', key, 0, now - ttl)
local count = redis.call('zcard', key)
if count < max_count then
redis.call('zadd', key, now, execution_id)
redis.call('expire', key, math.ceil(ttl/1000))
return 1
end
return 0
LUA
def initialize(redis_client = Redis.new)
@redis = redis_client
@intervals = {}
@stats = {}
@logger = Logger.new($stdout)
end
def run(seconds, fn, concurrency = 1, timeout_ms = 60_000)
interval_ms = parse_interval(seconds)
base_key = SecureRandom.uuid
setup_monitoring(base_key) if concurrency.positive?
setup_interval(base_key, interval_ms, fn, concurrency, timeout_ms)
StopControl.new(
-> { stop_execution(base_key) }
)
end
private
def parse_interval(interval)
case interval
when Numeric
interval * 1000
when /(\d+)ms$/
$1.to_i
when /(\d+)s$/
$1.to_i * 1000
else
raise ArgumentError, "Invalid interval format"
end
end
def setup_monitoring(base_key)
@stats[base_key] = {
attempts: 0,
successes: 0,
last_report: Time.now
}
end
def setup_interval(base_key, interval_ms, fn, concurrency, timeout_ms)
@intervals[base_key] = Thread.new do
loop do
execute_task(base_key, fn, concurrency, timeout_ms)
sleep(interval_ms / 1000.0)
end
end
end
def execute_task(base_key, fn, concurrency, timeout_ms)
execution_id = SecureRandom.uuid
return unless can_execute?(base_key, concurrency, timeout_ms, execution_id)
begin
Timeout.timeout(timeout_ms / 1000.0) do
fn.call(execution_id)
@stats[base_key][:successes] += 1 if concurrency.positive?
end
rescue Timeout::Error
@logger.error "Task #{execution_id} timed out after #{timeout_ms}ms"
rescue StandardError => e
@logger.error "Task #{execution_id} failed: #{e.message}"
ensure
release_lock(base_key, execution_id) if concurrency.positive?
end
end
def can_execute?(base_key, concurrency, timeout_ms, execution_id)
return true unless concurrency.positive?
@stats[base_key][:attempts] += 1
acquire_lock(base_key, timeout_ms, concurrency, execution_id)
end
def acquire_lock(base_key, timeout_ms, concurrency, execution_id)
key = "executions:#{base_key}"
now = (Time.now.to_f * 1000).to_i
result = @redis.eval(
LUA_ACQUIRE_LOCK,
keys: [key],
argv: [now, timeout_ms, concurrency, execution_id]
)
result == 1
rescue Redis::CommandError => e
@logger.error "Lock acquisition failed: #{e.message}"
false
end
def release_lock(base_key, execution_id)
@redis.zrem("executions:#{base_key}", execution_id)
rescue Redis::CommandError => e
@logger.error "Lock release failed: #{e.message}"
end
def stop_execution(base_key)
@intervals[base_key]&.kill
@intervals.delete(base_key)
cleanup_execution(base_key)
end
def cleanup_execution(base_key)
@redis.del("executions:#{base_key}")
report_final_stats(base_key)
@stats.delete(base_key)
end
def report_final_stats(base_key)
return unless stats = @stats[base_key]
success_rate = (stats[:successes].to_f / stats[:attempts] * 100).round(1)
@logger.info "Final stats: #{success_rate}% success rate (#{stats[:successes]}/#{stats[:attempts]})"
end
end
class StopControl
def initialize(stop_proc)
@stop_proc = stop_proc
end
def stop
@stop_proc.call
end
end
# Usage example:
# timer = EveryTime.new
#
# # Run a task every 500ms with concurrency of 2
# control = timer.run('500ms', ->(execution_id) {
# puts "Task #{execution_id} executed at #{Time.now}"
# }, 2, 5000)
#
# sleep 10
# control.stop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment