Skip to content

Instantly share code, notes, and snippets.

@bawNg
Created March 5, 2014 15:54
Show Gist options
  • Save bawNg/9369941 to your computer and use it in GitHub Desktop.
Save bawNg/9369941 to your computer and use it in GitHub Desktop.
EventMachine based process spawning module
module Callbacks
def self.included(base)
base.class.send :attr_reader, :_defined_callbacks
def base.def_callbacks(*names)
@_defined_callbacks ||= []
@_defined_callbacks |= names
names.each do |name|
define_method(name) {|*args, &block| _handle_callback(name, *args, &block) }
end
end
end
def callback_defined?(name)
self.class._defined_callbacks && self.class._defined_callbacks.include?(name.to_sym)
end
def callback_registered?(name)
@_callbacks && @_callbacks[name] && @_callbacks[name].size > 0
end
def callback_max_arity(name)
callback_registered?(name) && @_callbacks[name].map(&:arity).max or 0
end
def register_callback(name, arity=nil, &block)
callback = block
if arity && arity != block.arity
args = arity.times.map {|i| (97 + i).chr }.join ','
callback = eval("proc {|#{args}| block.(#{args}) }")
end
_handle_callback(name, &callback)
end
private
def _handle_callback(name, *args, &block)
@_callbacks ||= {}
if block_given?
#puts "[#{inspect}::Callbacks] Registered: #{name} (arity=#{block.arity})"
(@_callbacks[name] ||= []) << block
elsif @_callbacks[name]
@_callbacks[name].each {|cb| cb.(*args) }
end
self
end
end
module System
def self.spawn(*args)
SpawnedProcess.new(*args.reject(&:nil?))
end
end
module System
class SpawnedProcess
include Callbacks
attr_accessor :status
attr_reader :pid, :argv, :start_time, :pipes, :output
def_callbacks :started, :each_line, :ended, :timed_out
on :loaded do
scheduler.every(0.2) do
next unless signal_queue.size > 0
signal_count = signal_queue.size
signals = signal_count.times.collect { signal_queue.pop }.uniq
signals.each do |signal|
::System::SpawnedProcess.child_ended if signal == :CLD
end
end
end
signal(:CLD) do
end
class << self
def [](pid)
processes.detect {|process| process.pid == pid }
end
def processes
@processes ||= []
end
def child_ended
# The SIGCHLD handler may not be called exactly once for every child. Multiple children exiting concurrently
# may trigger only one SIGCHLD in the parent. Therefore, reap all processes that can be reaped.
ended_processes = []
while pid = Process.wait(-1, Process::WNOHANG)
if process = SpawnedProcess[pid]
process.status = $?
ended_processes << process
end
end
ended_processes.each do |process|
scheduler.in(0.1) { process.process_ended }
end
rescue Errno::ECHILD
end
end
def initialize(*args)
env, argv, options = extract_spawn_arguments(*args)
@argv = argv
options[:output] = :true unless options.include? :output
spawn_options = options.except(:input, :output, :user, :group, :unlimit_core, :affinity).dup
# Initialize and attach pipes to EventMachine
@pipes, @connections = {}, {}
if options[:input] # input: String to be sent to STDIN
@pipes[:in], stdin_write = IO.pipe.each {|io| io.close_on_exec = true }
@connections[:in] = EM.watch(stdin_write, ::System::WritableStream, options[:input].dup)
@connections[:in].notify_writable = true
end
# Callback checking disabled: In the case of a ProcessInstance, appropriate callbacks are only registered from the started callback
if options[:output] #and callback_registered?(:each_line) || callback_max_arity(:ended) > 1 # output: enabled if appropriate callbacks are registered unless false
[:out, :err].each do |name|
read_pipe, @pipes[name] = IO.pipe.each {|io| io.close_on_exec = true }
@connections[name] = EM.watch(read_pipe, ::System::ReadableStream, '').tap {|c| c.notify_readable = true }
@connections[name].after_read do |buffer| #TODO: implement max output size limit
#log :yellow, "[SpawnedProcess] Read data from STD#{name.upcase}. callback_registered?(:each_line)=#{callback_registered?(:each_line).inspect} callback_max_arity=#{callback_max_arity(:ended).inspect}"
#unless callback_registered?(:each_line) || callback_max_arity(:ended) > 1
# log :yellow, "[SpawnedProcess] No output callbacks registered, closing read connections..."
# next close_streams(:out, :err)
#end
*lines, remaining_buffer = buffer.split(/\r?\n/) #TODO: limit number of lines (schedule further processing in next_tick so that it is processed before process exits)
buffer.replace(remaining_buffer || '')
(@output ||= '') << lines.join("\n") + "\n" #if callback_max_arity(:ended) > 1 # ended callback args: |status, output|
lines.each {|line| each_line(line) } # ^ Callback checking disabled (see above comment)
end
end
else
$dev_null ||= open('/dev/null', 'w')
@pipes[:out] = @pipes[:err] = $dev_null
end
spawn_options.merge!(@pipes)
@connections.each do |name, connection|
connection.force_encoding
connection.callback { @connections.delete(name) }
end
if options[:timeout] && options[:timeout] > 0
@timeout_timer = EM::Timer.new(options[:timeout]) do
close_streams
kill
EM.schedule { timed_out }
end
end
@start_time = Time.now
self.class.scheduler.in(0.25) do
log :yellow, "Spawning new process: #{argv.inspect} #{spawn_options.inspect if spawn_options.present?}"
@pid = fork do
`taskset -apc #{options[:affinity]} #{Process.pid}` if options[:affinity]
if options[:user]
user = Etc.getpwnam(options[:user])
Process::initgroups(options[:user], user.gid)
Process::Sys::setgid(options[:group] ? Etc.getgrnam(options[:group]).try(:gid) || user.gid : user.gid)
Process::Sys::setuid(user.uid)
ENV['HOME'] = user.dir
end
Process.setrlimit(:CORE, Process::RLIM_INFINITY) if options[:unlimit_core]
$stdin = pipes[:in] if pipes[:in]
$stdout = pipes[:out] if pipes[:out]
$stderr = pipes[:err] if pipes[:err]
exec(env, *(argv + [spawn_options]))
end
if $FD_DEBUG
log :red, "FDs after spawn:"
puts `lsof -p #{Process.pid} | grep -v "/usr/" | grep -v "/lib/" | grep -v " TCP " | grep -v "inspector.log" | grep -v "root rtd" | grep -v "root cwd"`
puts
end
started(@pid)
end
self.class.processes << self
end
def kill
@timeout_timer.cancel if @timeout_timer
@kill_timer = EM::Timer.new(6) do
log :yellow, "Timed out waiting for process to terminate, sending KILL signal to process: #{pid}"
Process.kill('KILL', pid) rescue nil
end
log :yellow, "Sending TERM signal to process: #{pid}"
Process.kill('TERM', pid) rescue nil #TODO: timeout waiting for process to die and attempt sending KILL signal
end
def process_ended
if @connections[:out]
output << @connections[:out].buffer if @connections[:out].buffer.size > 0 #TODO: last lines in buffer should be in the correct order
output << @connections[:err].buffer if @connections[:err].buffer.size > 0
end
close_streams
@timeout_timer.cancel if @timeout_timer
@kill_timer.cancel if @kill_timer
@runtime = Time.now - @start_time
self.class.processes.delete(self)
next_tick do
ended(status, output)
end
end
def runtime
@runtime || Time.now - @start_time
end
def inspect
"(SpawnedProcess: #@pid, runtime: #{runtime})"
end
private
def close_streams(*names)
@connections.reject! do |name, connection|
next if names.size > 0 unless names.include?(name)
connection.close
true
end
[:out, :err].each do |name|
@pipes[name].close if @pipes[name] && !@pipes[name].closed? unless $dev_null && @pipes[name] == $dev_null
end
end
def extract_spawn_arguments(*args)
options = args[-1].respond_to?(:to_hash) ? args.pop.to_hash : {}
options.keys.each {|key| options[key.to_sym] = options.delete(key) if key.is_a? String }
env = args.first.respond_to?(:to_hash) ? args.shift.to_hash : {}
env.merge!(options.delete(:env)) if options.key?(:env)
[env, adjust_spawn_argv(args), options]
end
def adjust_spawn_argv(args)
if args.size == 1 && args[0] =~ /[ |>]/
[%w(/bin/sh /bin/sh), '-c', args[0]] # single string with these characters means run it through the shell
else
first = args.shift
args.map!(&:to_s)
if first.respond_to?(:to_ary)
[first, *args] # [[cmdname, argv0], argv1, ...]
else
[[first, first], *args] # [argv0, argv1, ...]
end
end
end
end
end
module System
class Stream < EM::Connection
include EM::Deferrable
include ::Callbacks
attr_reader :buffer
def_callbacks :after_read, :after_write
def initialize(buffer)
@buffer = buffer
end
def force_encoding
if @buffer.respond_to? :force_encoding
@io.set_encoding('BINARY', 'BINARY')
@buffer.force_encoding('BINARY')
end
end
def close
# NB: The ordering here is important. If we're using epoll, detach() attempts to deregister the associated fd via
# EPOLL_CTL_DEL and marks the EventableDescriptor for deletion upon completion of the iteration of the event
# loop. However, if the fd was closed before calling detach(), epoll_ctl() will sometimes return EBADFD and
# fail to remove the fd. This can lead to epoll_wait() returning an event whose data pointer is invalid
# (since it was deleted in a prior iteration of the event loop).
detach
@io.close rescue nil
end
end
class ReadableStream < Stream
CHUNK_SIZE = 32 * 1024
def notify_readable
begin
@buffer << @io.readpartial(CHUNK_SIZE)
after_read(@buffer)
rescue Errno::EAGAIN, Errno::EINTR
rescue EOFError
close
set_deferred_success
end
end
end
class WritableStream < Stream
def notify_writable
begin
error = nil
size = @io.write_nonblock(@buffer)
@buffer = @buffer[size, @buffer.size]
after_write(@buffer)
rescue Errno::EPIPE => error
rescue Errno::EAGAIN, Errno::EINTR
rescue IOError => error
puts "Exception raised while writing to STDIN: #{error.message}"
end
if error || @buffer.size == 0
log :yellow, "#{error ? 'An error occurred while writing to stream' : 'Buffer is empty'}, closing connection..." if error
close
set_deferred_success
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment