Last active
February 12, 2018 08:35
-
-
Save jancajthaml/35fed49420117172eeba244dc660fe1c to your computer and use it in GitHub Desktop.
Parallel support for ruby collections
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# GENERAL INFORMATION | |
# as in any language parallelism is not free and should be used mainly for costly, non-atomic, blocking operations | |
# on primitive operations it is expected to be slower than native ruby implementation because of additional resources | |
# and allocations spent on parallelism | |
# | |
# implementation is lock-free, for this reason following design decisions were made | |
# - map does not replace value in place but instead return new enumerable | |
# - reject and select do 2 passes over collection (using compact) to guarantee collision-free bucket | |
# - all methods require block and do not work with &block and/or &filter passed as a argument | |
--- | |
# parallel map | |
puts (1..10).par_map { |x| | |
sleep 2 if x == 4 | |
x * 2 | |
} | |
# parallel each | |
(1..10).par_each { |x| | |
puts x | |
} | |
# parallel reject | |
puts (1..20).par_reject { |x| | |
sleep 2 if (x%10 == 0) | |
((x % 2) == 0) | |
} | |
# parallel select | |
puts (1..20).par_select { |x| | |
sleep 2 if (x%10 == 0) | |
((x % 2) == 0) | |
} | |
# example of asynchronizing synchronized workflow | |
synchronous_workflow = lambda { | |
a = do_thing_a() | |
b = do_thing_b() | |
c = do_thing_c() | |
resolve = do_resolve(a, b, c) | |
log = do_log(a, b, c) | |
} | |
asynchronous_workflow = lambda { | |
lazy_a = lambda { do_thing_a() } | |
lazy_b = lambda { do_thing_b() } | |
lazy_c = lambda { do_thing_c() } | |
a, b, c = [lazy_a, lazy_b, lazy_c].par_map { |f| f.call() } | |
lazy_resolve = lambda { do_resolve(a, b, c) } | |
lazy_log = lambda { do_log(a, b, c) } | |
[lazy_resolve, lazy_log].par_each { |f| f.call() } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
module Enumerable | |
def par_each | |
return unless block_given? | |
backlog = Queue.new | |
each { |work| backlog << work } | |
(1..Enumerable.parallelism).map { | |
backlog << nil | |
Thread.new { | |
while tick = backlog.deq | |
yield tick | |
end | |
} | |
}.each(&:join) | |
end | |
def par_map | |
return unless block_given? | |
result = [] | |
idx = 0 | |
backlog = Queue.new | |
each { |work| backlog << work } | |
(1..Enumerable.parallelism).map { | |
backlog << nil | |
Thread.new { | |
while tick = backlog.deq | |
i = idx | |
idx += 1 | |
result[i] = (yield tick) | |
end | |
} | |
}.each(&:join) | |
result | |
end | |
def par_reject | |
return unless block_given? | |
result = [] | |
idx = 0 | |
backlog = Queue.new | |
each { |work| backlog << work } | |
(1..Enumerable.parallelism).map { | |
backlog << nil | |
Thread.new { | |
while tick = backlog.deq | |
i = idx | |
idx += 1 | |
next if yield tick | |
result[i] = tick | |
end | |
} | |
}.each(&:join) | |
result.compact | |
end | |
def par_select | |
return unless block_given? | |
result = [] | |
idx = 0 | |
backlog = Queue.new | |
each { |work| backlog << work } | |
(1..Enumerable.parallelism).map { | |
backlog << nil | |
Thread.new { | |
while tick = backlog.deq | |
i = idx | |
idx += 1 | |
next unless yield tick | |
result[i] = tick | |
end | |
} | |
}.each(&:join) | |
result.compact | |
end | |
private | |
class << self; attr_accessor :parallelism ; end | |
self.parallelism = (Integer(%x(getconf _NPROCESSORS_ONLN)) rescue 1) << 3 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment