Created
May 18, 2011 15:42
-
-
Save rparker/978836 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------------------------------------------ | |
this 12-line file is "send.rb" | |
------------------------------------------------------------------------ | |
#!/usr/bin/env ruby | |
require 'rubygems' | |
require 'socket' | |
sock = TCPSocket.new 'localhost', 9814 | |
t0 = Time.now | |
sock.send 'start working', 0 | |
reply, sender_addrinfo = sock.recvfrom(16) | |
printf("%8.4fs later, got the stale reply: %s\n", (Time.now - t0), reply) | |
------------------------------------------------------------------------ | |
and this file is 'clairvoyant.rb' | |
------------------------------------------------------------------------ | |
#!/usr/bin/env ruby | |
require 'rubygems' | |
require 'eventmachine' | |
class Feeder < EventMachine::Connection⋅ | |
def initialize(q)⋅ | |
@q = q⋅ | |
end⋅ | |
def receive_data(cmd)⋅ | |
send_data cmd + '.'*(16 - cmd.size) | |
puts 'Reply has been sent, entangled with the work which has yet to begin' | |
@q.push cmd⋅ | |
end⋅ | |
end⋅ | |
# - - - - - - - - - - - - -⋅ | |
class Worker | |
# This busy loop takes >50ms on my mac. | |
# If you comment out the work, the reply is received in < 1ms. | |
def busy(str) | |
puts 'The future work period has yet to begin...' | |
10.times{system 'ls'} | |
puts 'work completed' | |
end | |
end | |
# - - - - - - - - - - - - -⋅ | |
class Clairvoyant | |
def initialize | |
puts 'Clair is listening' | |
end | |
def run | |
worker = Worker.new | |
EventMachine::run {⋅ | |
q = EM::Channel.new⋅ | |
EM::start_server '127.0.0.1', 9814, Feeder, q⋅ | |
unused = q.subscribe{|msg| worker.busy(msg)}⋅ | |
}⋅ | |
end | |
end | |
# - - - - - - - - - - - - -⋅ | |
clair = Clairvoyant.new | |
clair.run |
That's still going to block the reactor. It will execute all of the 10.times { system 'ls' } commands before the reactor loop runs again. The reactor loop has to turn in order for the messages to get sent out.
You either need to turn the blocking operations into non-blocking or use something like EM.defer to run them on a background thread.
On Wed, May 18, 2011 at 2:23 PM, dj2 < ***@***.***>wrote:
That's still going to block the reactor. It will execute all of the
10.times { system 'ls' } commands before the reactor loop runs again. The
reactor loop has to turn in order for the messages to get sent out.
You either need to turn the blocking operations into non-blocking or use
something like EM.defer to run them on a background thread.
Replacing
unused = q.subscribe{|msg| worker.busy(msg)}
with
unused = q.subscribe{|msg| EM.defer lambda {worker.busy(msg)} }
Solves the delayed-response problem.
But I suspect subsequent socket commands that arrive prior to the completion
of the first Worker's job will start additional worker instances, which will
cause disastrously interleaved work in my application.
I suppose I should change the Channel subscriber to a Queue popper, which
does not pop the next entry until the worker completes his job.... if I
defer the worker that pops the queue entry so that it does not also block
receive_data() and q.push().
Thank you,
- Randy
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks - you're right. But the actual program actually does some work instead of sleeping. I just put 'sleep 2' in the isolated example for simplicity, and it was a very misleading simplification. The EM timers aren't a one-line replacement for sleep, so I've changed to: 10.times{system 'ls'} Which takes more than 50ms on my system. With no "work" being done, the reply is received in less than one millisecond, so the problem still remains.
Somehow, the socket data is not being received in the 'send.rb' program until AFTER the work gets done.