-
-
Save manoj2411/df9a0479cac9c500bac65fbd07d8805a to your computer and use it in GitHub Desktop.
a load balancer for sidekiq jobs!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Amura | |
class SidekiqManager | |
# a datastructure to maintain queue meta data. the structure of which is {host_name: {queue_name:{min:1,max:1,conc:10,latency:1,queue_size:10,kill_idle:-1, tags:['default'], total_checks:1,current_check:0}}} | |
# host_name - name of the machine where its running. useful in a distributed environment where app is running on mulitple instances | |
# queue_name - name of the queue (which you can mention in the sidekiq worker as sidekiq_options :queue => :mailer ) | |
# min: minimum number of processes required to process this queue on this machine. | |
# max: maximum number of processes permitted to process this queue on this machine. a upper limit to avoid memory overflow and unlimited process spawning. | |
# conc: concurreny (number of worker threads) for each of the processes. this is -C option given while booting up sidekiq. | |
# latency: this is the default safe latency which is permissable for this queue. anything beyond this will trigger new processes if other conditions are met. | |
# queue_size: this is the default maximum safe queue_size which is permissable for this queue. anything beyond 30% of this will trigger new processes if other conditions are met. | |
# tags: tagging the processes. this is the -g option while booting up sidekiq. | |
# total_checks: total number of checks to be done before taking a decision to spawn/kill more/extra processes. this helps is identifying temporary small spikes which are false positives and get drained/executed quickly | |
# current_check: a check counter stored in redis to help take the above decision. | |
@@healthy_set = { | |
"node-1" => { | |
default: {min: 2, max: 4, conc: 15, latency: 30, queue_size:1000, kill_idle: -1, tags:["default"], total_checks:2, current_check:0} | |
}, | |
"node-2" => { | |
default: { | |
min: 1, max: 2, conc: 10, latency: 10, queue_size:1000, kill_idle: 1000, tags:["default"], total_checks:2, current_check:0 | |
}, | |
mailer: { | |
min: 1, max:3, conc: 20, latency: 10, queue_size:200, kill_idle: 1000, tags:["mixpanel"], total_checks:2, current_check:0} | |
} | |
} | |
@@redis = Redis.new() | |
end | |
#quiets all the queues & spawns new set of processes to manage the queues. useful for calling in capistrano scripts after deployment. | |
def self.restart | |
host_name = Socket.gethostname | |
@@healthy_set[host_name].each do |queue_name, queue_meta_data| | |
queue = Sidekiq::Queue.new(queue_name.to_s) | |
Sidekiq::ProcessSet.new.select{|x| x["hostname"] == host_name && x["queues"].include?(queue_name.to_s)}.each(&:quiet!) | |
(1..queue_meta_data[:min]).to_a.each do |tag| | |
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})") | |
# Detach the spawned process | |
Process.detach pid | |
end | |
end | |
end | |
# this is the main method. It will be continoulsy called at certain frequency. can be put up in the crontab to be executed. | |
def self.run() | |
host_name = Socket.gethostname | |
kill_quiet_ones | |
@@healthy_set[host_name].each do |queue_name,queue_meta_data| | |
check = @@redis.hget("queue_management", queue_name).to_i | |
queue = Sidekiq::Queue.new(queue_name.to_s) | |
current_processes = Sidekiq::ProcessSet.new | |
processes = current_processes.select{|x| x["hostname"] == host_name && x["queues"].include?(queue_name.to_s)} | |
if(processes.count < queue_meta_data[:min]) # count is less than minimum required. kernl killed someone. pass it on to FBI to investigate :'\ | |
Rails.logger.info("Queue Alert: Adding more on #{host_name}. #{queue_name.to_s} should have #{queue_meta_data[:min]} but has #{processes.count}. Starting new process.") | |
tags = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i} | |
new_set = (1..queue_meta_data[:min]).to_a - tags | |
new_set.each do |tag| | |
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})") | |
# Detach the spawned process | |
Process.detach pid | |
end | |
else | |
if(queue.size > queue_meta_data[:queue_size]*1.3) | |
if(check < queue_meta_data[:total_checks]) | |
queue_meta_data[:current_check] += 1 | |
@@redis.hincrby("queue_management", queue_name, 1) | |
elsif(check == queue_meta_data[:total_checks]) | |
if(processes.count == queue_meta_data[:max]) | |
Rails.logger.info("Queue Alert: Adding more on #{host_name}. #{queue_name.to_s} has max processes(#{processes.count}). Current Stats - Latency : #{queue.latency}, Size : #{queue.size}") | |
elsif(processes.count < queue_meta_data[:max]) | |
tag = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i}.sort.last + 1 | |
pid = Process.spawn("cd #{Rails.root.to_s} && ( RVM_BIN_PATH=~/.rvm/bin rvm default do bundle exec sidekiq -c #{queue_meta_data[:conc]} -d -i 0 -P #{Rails.root.to_s}/tmp/pids/sidekiq_#{queue_name.to_s}#{tag}.pid -e #{Rails.env} -L #{Rails.root.to_s}/log/sidekiq.log -q #{queue_name.to_s} -g #{queue_name.to_s}#{tag})") | |
# Detach the spawned process | |
Process.detach pid | |
queue_meta_data[:current_check] = 0 | |
@@redis.hset("queue_management", queue_name, 0) | |
end | |
end | |
elsif(queue.size <= queue_meta_data[:queue_size]) | |
# queue is healthy now. lets kill the extra flab :D | |
tags = processes.collect{|x| x["tag"].gsub(queue_name.to_s,"").to_i} | |
set_to_kill = tags - (1..queue_meta_data[:min]).to_a | |
Rails.logger.info("Queue Alert: Quieting few from #{host_name}:#{queue_name.to_s}. Killing off #{set_to_kill.count} processes. Current Stats - Existing Processes: #{processes.count}, Latency : #{queue.latency}, Size : #{queue.size}").deliver | |
processes.select{|x| set_to_kill.include?(x["tag"].gsub(queue_name.to_s,"").to_i)}.each do |process| | |
process.quiet! | |
end | |
kill_quiet_ones | |
elsif(check < queue_meta_data[:total_checks] && queue.size <= queue_meta_data[:queue_size]) | |
queue_meta_data[:current_check] += 1 | |
@@redis.hincrby("queue_management", queue_name, 1) | |
end | |
end | |
end | |
end | |
# kills all the quiet processes | |
def self.kill_quiet_ones | |
host_name = Socket.gethostname | |
data = [] | |
Sidekiq::ProcessSet.new.select{|x| x["quiet"] == "true" && x["busy"] == 0 && x["hostname"] == host_name}.each{|x| x.stop!} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment