Created
March 28, 2025 07:38
-
-
Save mpraglowski/5779da7cd3881e3210800df6fe905a05 to your computer and use it in GitHub Desktop.
Sample code for "Handling Concurrency with Database Locks and SKIP LOCKED" blog post
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/inline' | |
gemfile true do | |
source 'https://rubygems.org' | |
gem 'activerecord' | |
gem 'mysql2' | |
gem 'ruby-progressbar' | |
gem 'irb' | |
end | |
require 'thread' | |
require 'benchmark' | |
require "ruby-progressbar" | |
require 'active_record' | |
require 'active_support' | |
SLEEP_FOR = 0.1 | |
ActiveRecord::Base.establish_connection(adapter: 'mysql2', host: '127.0.0.1', database: 'inventory', pool: 100, init_command: 'SET SESSION innodb_lock_wait_timeout=1;') | |
puts ActiveRecord::Base.connection.execute("show variables like '%lock_wait%';").to_h.merge(isolation_level: ActiveSupport::IsolatedExecutionState.isolation_level).symbolize_keys | |
ActiveRecord::Base.logger = nil | |
ActiveRecord::Schema.define do | |
create_table :inventories, force: true do |t| | |
t.integer :product_id, null: false | |
t.integer :total, null: false | |
t.integer :available, null: false, default: 0 | |
end | |
add_index :inventories, :product_id | |
create_table :inventory_items, force: true do |t| | |
t.integer :inventory_id, null: false | |
t.string :status, null: false, default: 'free' | |
end | |
add_index :inventory_items, [:inventory_id, :status] | |
end | |
OutOfStock = Class.new(StandardError) | |
class Inventory < ActiveRecord::Base | |
self.table_name = 'inventories' | |
end | |
class LockingInventory < Inventory | |
def reserve!(quantity) | |
with_lock do | |
sleep(SLEEP_FOR) | |
raise OutOfStock if self.available < quantity | |
self.available -= quantity | |
self.save! | |
end | |
end | |
def inspect | |
"For #{product_id}, available: #{available}, reserved: #{total-available}" | |
end | |
end | |
class NonLockingInventory < Inventory | |
has_many :items, class_name: 'InventoryItem', foreign_key: :inventory_id | |
def reserve!(quantity) | |
sleep(SLEEP_FOR) | |
items_to_take = self.items.where(status: 'free').lock('FOR UPDATE SKIP LOCKED').limit(quantity) | |
raise OutOfStock if items_to_take.length < quantity | |
items_to_take.update_all(status: 'reserved') | |
end | |
def inspect | |
stats = self.items.group(:status).count | |
"For #{product_id}, available: #{stats["free"] || 0}, reserved: #{stats["reserved"] || 0}" | |
end | |
end | |
class InventoryItem < ActiveRecord::Base | |
validates :status, inclusion: {in: %w[free reserved]} | |
end | |
def setup_inventory(limit) | |
puts "Setup inventories..." | |
InventoryItem.delete_all | |
Inventory.delete_all | |
stock = 100*limit | |
progress = ProgressBar.create(total: stock) | |
(1..stock).each do |i| | |
inventory = Inventory.create!(product_id: i, available: limit, total: limit) | |
InventoryItem.insert_all(limit.times.map { {inventory_id: inventory.id} }) | |
progress.increment | |
end | |
puts "Done: Inventories created: #{Inventory.count} (and #{InventoryItem.count} items)" | |
end | |
def run(n_times, &block) | |
task_queue = Queue.new | |
n_times.times.each { |i| task_queue << i } | |
stats = Hash.new {|h,k| h[k] = 0} | |
workers = 100.times.map do | |
Thread.new do | |
until task_queue.empty? | |
task = task_queue.pop(true) rescue nil | |
block.call if task | |
stats[Thread.current.object_id] += 1 | |
end | |
end | |
end | |
workers.each(&:join) | |
puts "Done #{stats.map{|k,v| v}.sum} requests using #{workers.count} workers, with ~#{stats.map{|k,v| v}.sum / workers.count} requests per worker" | |
end | |
def test(bm, inventory_class, limit, product_id = nil, items = 1) | |
requests = limit * 10 | |
product_id ||= rand(1..100*limit) | |
inventory = inventory_class.find_by(product_id: product_id) | |
puts "Starting #{inventory_class}: #{requests} times trying to reserve product #{product_id}" | |
puts "Before #{inventory_class}: " + inventory.inspect | |
errors = Hash.new {|h,k| h[k] = 0} | |
bm.report("Using #{inventory_class}") do | |
run(requests) do | |
inventory.reserve!(items) | |
rescue => e | |
errors[e.class] += 1 | |
end | |
end | |
puts "After #{inventory_class}: " + inventory.reload.inspect | |
puts errors.inspect | |
end | |
limit = (ARGV[0] || 100).to_i | |
setup_inventory(limit) | |
Benchmark.bm do |x| | |
test(x, LockingInventory, limit) | |
test(x, NonLockingInventory, limit) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
sleep
is used here to "simulate" some work done in real systems and to make sure threads are switched.Run:
ruby inventory.rb 100