Last active
December 25, 2015 20:59
-
-
Save bawNg/7039438 to your computer and use it in GitHub Desktop.
Patch for the mongo-1.9.2 driver which includes fiber based connection pool support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EventMachine::Synchrony::MongoTimeoutHandler | |
def self.timeout(op_timeout, ex_class, &block) | |
f = Fiber.current | |
timer = EM::Timer.new(op_timeout) { f.resume(nil) } | |
res = block.call | |
timer.cancel | |
res | |
end | |
end | |
old_verbose = $VERBOSE | |
begin | |
$VERBOSE = nil | |
module Mongo | |
class MongoClient | |
ConditionVariable = ::EventMachine::Synchrony::Thread::ConditionVariable | |
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler | |
end | |
class Pool | |
TCPSocket = ::EventMachine::Synchrony::TCPSocket | |
Mutex = ::EventMachine::Synchrony::Thread::Mutex | |
ConditionVariable = ::EventMachine::Synchrony::Thread::ConditionVariable | |
def checkout_new_socket | |
begin | |
socket = @client.socket_class.new(@host, @port, @client.op_timeout, | |
@client.connect_timeout, | |
@client.socket_opts) | |
socket.pool = self | |
rescue => ex | |
socket.close if socket | |
@node.close if @node | |
raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}" | |
end | |
@client.apply_saved_authentication(:socket => socket) | |
@sockets << socket | |
@checked_out << socket | |
@thread_ids_to_sockets[Fiber.current.object_id] = socket | |
socket | |
end | |
def checkout_existing_socket(socket=nil) | |
if !socket | |
available = @sockets - @checked_out | |
socket = available[rand(available.length)] | |
end | |
if socket.pid != Process.pid | |
@sockets.delete(socket) | |
if socket | |
socket.close unless socket.closed? | |
end | |
checkout_new_socket | |
else | |
@checked_out << socket | |
@thread_ids_to_sockets[Fiber.current.object_id] = socket | |
socket | |
end | |
end | |
def checkout | |
@client.connect if !@client.connected? | |
start_time = Time.now | |
loop do | |
if (Time.now - start_time) > @timeout | |
raise ConnectionTimeoutError, "could not obtain connection within " + | |
"#{@timeout} seconds. The max pool size is currently #{@size}; " + | |
"consider increasing the pool size or timeout." | |
end | |
@connection_mutex.synchronize do | |
check_prune | |
socket = nil | |
if socket_for_thread = @thread_ids_to_sockets[Fiber.current.object_id] | |
if !@checked_out.include?(socket_for_thread) | |
socket = checkout_existing_socket(socket_for_thread) | |
end | |
else | |
if @sockets.size < @size | |
socket = checkout_new_socket | |
elsif @checked_out.size < @sockets.size | |
socket = checkout_existing_socket | |
end | |
end | |
if socket | |
@socket_ops[socket].reject! do |op| | |
op.call | |
end | |
if socket.closed? | |
@checked_out.delete(socket) | |
@sockets.delete(socket) | |
@thread_ids_to_sockets.delete(Fiber.current.object_id) | |
socket = checkout_new_socket | |
end | |
return socket | |
else | |
@queue.wait(@connection_mutex) | |
end | |
end | |
end | |
end | |
end | |
class Node | |
Mutex = ::EventMachine::Synchrony::Thread::Mutex | |
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler | |
end | |
class TCPSocket | |
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler | |
def initialize(host, port, op_timeout=nil, connect_timeout=nil, opts={}) | |
@op_timeout = op_timeout || 30 | |
@connect_timeout = connect_timeout || 30 | |
@pid = Process.pid | |
@address = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3] | |
@port = port | |
@socket = nil | |
connect | |
end | |
def connect | |
::EventMachine::Synchrony::MongoTimeoutHandler.timeout(@connect_timeout, Mongo::ConnectionTimeoutError) do | |
@socket = EM::Synchrony::TCPSocket.new(@address, @port) | |
end | |
end | |
def send(data) | |
raise SocketError, 'Not connected yet' if not @socket | |
@socket.write(data) | |
end | |
def read(maxlen, buffer) | |
raise SocketError, 'Not connected yet' if not @socket | |
::EventMachine::Synchrony::MongoTimeoutHandler.timeout(@op_timeout, Mongo::OperationTimeout) do | |
@socket.read(maxlen, buffer) | |
end | |
end | |
end | |
class SSLSocket | |
Timeout = ::EventMachine::Synchrony::MongoTimeoutHandler | |
end | |
class MongoReplicaSetClient | |
Mutex = ::EventMachine::Synchrony::Thread::Mutex | |
end | |
class MongoShardedClient | |
Mutex = ::EventMachine::Synchrony::Thread::Mutex | |
end | |
class PoolManager | |
Mutex = ::EventMachine::Synchrony::Thread::Mutex | |
end | |
end | |
ensure | |
$VERBOSE = old_verbose | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment