Skip to content

Instantly share code, notes, and snippets.

@hrom512
Created July 2, 2020 17:24
Show Gist options
  • Save hrom512/a0e8b6fc4c7d1b6439c31ac6c6782067 to your computer and use it in GitHub Desktop.
Save hrom512/a0e8b6fc4c7d1b6439c31ac6c6782067 to your computer and use it in GitHub Desktop.
Multi-thread processing
class ThreadPool
attr_reader :pool_size, :queue_size, :block
def initialize(pool_size:, queue_size:, &block)
@pool_size = pool_size
@queue_size = queue_size
@block = block
pool
end
def execute(*args)
if queue.size >= queue_size
sleep 0.1
else
queue.push(args)
end
end
def wait
sleep 0.1 while !queue.empty?
end
def wait_and_exit
wait
pool.each(&:exit)
end
private
def queue
@queue ||= Thread::Queue.new
end
def pool
@pool ||= pool_size.times.map do
Thread.new(block) do |block|
Thread.current.abort_on_exception = true
loop do
params = queue.pop
sleep 0.1 && next if params.nil?
block.call(params)
end
end
end
end
end
# Create 10 threads and message queue with limit 100
pool = ThreadPool.new(pool_size: 10, queue_size: 100) do |column1, column2|
# Something useful work inside thread
# Will be executed with args from pool.execute
sleep 1
puts "column1=#{column1} column2=#{column2}"
end
CSV.foreach("big_data.csv", headers: true, header_converters: :symbol) do |row|
# Process each row in thread, wait if queue size >= 100
pool.execute(row[:column1], row[:column2])
end
# Wait until queue is empty and exit from threads
pool.wait_and_exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment