Created
October 26, 2012 09:03
-
-
Save gnarg/3957752 to your computer and use it in GitHub Desktop.
socketpair protocol for druby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
require 'drb/drb' | |
module DRbSocketPair | |
def self.open(uri, config={}) | |
tag = parse_uri(uri) | |
parent_socket, child_socket = @parent.sockets[tag] | |
parent_socket.close unless parent_socket.closed? | |
Child.new(child_socket, config) | |
end | |
def self.open_server(uri, config={}) | |
parse_uri(uri) | |
@parent = Parent.new(config) | |
end | |
def self.uri_option(uri, config={}) | |
parse_uri(uri) | |
return uri, nil | |
end | |
def self.before_fork(tag) | |
@parent.sockets[tag.to_s] = Socket.pair(Socket::AF_UNIX, | |
Socket::SOCK_STREAM, 0) | |
@parent.wake | |
end | |
def self.fork(tag, &block) | |
before_fork(tag) | |
Process.fork(&block) | |
end | |
def self.parse_uri(uri) | |
if uri =~ /^drbsocketpair:(.*)/ | |
$1 | |
else | |
raise DRb::DRbBadScheme.new | |
end | |
end | |
class Parent | |
attr_reader :sockets | |
def initialize(config) | |
@config = config | |
@sockets = {} | |
@wake_out, @wake_in = IO.pipe | |
end | |
def accept | |
loop do | |
@sockets.delete_if {|_,(s,_)| s.closed? } | |
sockets_to_listen = @sockets.values.map{|s,_| s } | |
ready = IO.select(sockets_to_listen + [@wake_out])[0][0] | |
if ready == @wake_out | |
ready.read(1) | |
next | |
else | |
tag, (parent_socket, child_socket) = @sockets.find {|_,(s,_)| s == ready } | |
@sockets.delete(tag) | |
child_socket.close unless child_socket.closed? | |
return Server.new(parent_socket, tag, @config) | |
end | |
end | |
end | |
def close | |
@sockets.each {|s,_| s.close unless s.closed? } | |
wake | |
end | |
def wake | |
@wake_in.write('.') | |
end | |
def uri | |
'drubysocketpair:' | |
end | |
end | |
class Server | |
def initialize(socket, tag, config) | |
@socket = socket | |
@tag = tag | |
@msg = DRb::DRbMessage.new(config) | |
end | |
def recv_request | |
@msg.recv_request(@socket) | |
end | |
def send_reply(succ, result) | |
@msg.send_reply(@socket, succ, result) | |
end | |
def close | |
@socket.close | |
end | |
def uri | |
"drbsocketpair:#{@tag}" | |
end | |
end | |
class Child | |
def initialize(socket, config) | |
@socket = socket | |
@msg = DRb::DRbMessage.new(config) | |
end | |
def send_request(ref, msg_id, arg, b) | |
@msg.send_request(@socket, ref, msg_id, arg, b) | |
end | |
def recv_reply | |
@msg.recv_reply(@socket) | |
end | |
def alive? | |
!@socket.closed? | |
end | |
def close | |
@socket.close unless @socket.closed? | |
end | |
end | |
DRb::DRbProtocol.add_protocol(DRbSocketPair) | |
end | |
if $0 == __FILE__ | |
class TimeServer | |
def get_current_time | |
Time.now | |
end | |
end | |
DRb.start_service('drbsocketpair:', TimeServer.new) | |
pids = [] | |
DRbSocketPair.before_fork(0) | |
pids << Process.fork do | |
timeserver = DRbObject.new_with_uri('drbsocketpair:0') | |
puts "!! 0 !! #{Process.pid} #{timeserver.get_current_time}" | |
end | |
DRbSocketPair.before_fork(1) | |
pids << Process.fork do | |
timeserver = DRbObject.new_with_uri('drbsocketpair:1') | |
puts "!! 1 !! #{Process.pid} #{timeserver.get_current_time}" | |
end | |
pids << DRbSocketPair.fork(2) do | |
timeserver = DRbObject.new_with_uri('drbsocketpair:2') | |
puts "!! 2 !! #{Process.pid} #{timeserver.get_current_time}" | |
puts "!! 2 !! #{Process.pid} #{timeserver.get_current_time} again" | |
end | |
pids.each{|pid| Process.wait(pid) } | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
dRuby needs open more connections.
I will explain tomorrow.
pids << DRbSocketPair.fork(3) do
timeserver = DRbObject.new_with_uri('drbsocketpair:3')
timeserver.queue.push(:a)
timeserver.queue.push(:b)
t1 = Thread.new do
p timeserver.queue.pop
end
t2 = Thread.new do
p timeserver.queue.pop
end
sleep 2
t1.join
t2.join
end