Created
April 19, 2018 21:13
-
-
Save lwoodson/53c5c0da6f7723efe33b7bef2e22291d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'benchmark' | |
require 'ostruct' | |
module Platappus | |
## | |
# Class capable of carrying out multi-threaded map/reduce | |
# operations. Dev specifies 1-N map blocks to produce | |
# interim values, and a reduce block to aggregate results. | |
# For example: | |
# | |
# mr = MapReduce.new | |
# | |
# mr.map do | |
# sleep_time = (1..30).to_a.sample / 10.0 | |
# sleep sleep_time | |
# sleep_time | |
# end | |
# | |
# mr.reduce do |results| | |
# results(&:+) | |
# end | |
# | |
# mr_result = mr.exec! | |
# | |
# mr_result.value | |
# => 14.3999999999 | |
# | |
# The result of a MapReduce.exec! call has the following | |
# attributes: | |
# | |
# - value the final value of the reduce aggregation | |
# - errors an array of exceptions while executing map | |
# blocks | |
# - has_errors true if there were any errors | |
# - map_time the time to carry out all parallelized map | |
# operations | |
# - reduce_time the time to carry out the reduction operation | |
# - total_time the total time to carry out the exec! operation | |
# | |
# The map operations are carried out in parallel by different | |
# threads. Each thread carries out a number of map operations | |
# defined by the batch_size constructor arg. This value defaults | |
# to 1, so that each map operation is carried out by a separate | |
# thread. | |
# | |
# If an exception is raised during reduction, a | |
# MapReduce::ReductionError is raised containing the interim | |
# results and any mapping errors prior to the reduction. The | |
# reduction error's cause attribute will contain the underlying | |
# exception. | |
# | |
class MapReduce | |
## | |
# Create a new MapReduce operation for the specified batch | |
# size and check interval | |
def initialize(batch_size: 1, check_interval: 0.05) | |
@batch_size = batch_size | |
@check_interval = check_interval | |
@mappers = [] | |
@reducer = lambda do |interim_results| | |
interim_results | |
end | |
end | |
## | |
# Specify a map operation for the MapReduce | |
def map(&block) | |
@mappers << block | |
end | |
## | |
# Specify the reduction operation for the MapReduce | |
def reduce(&block) | |
@reducer = block | |
end | |
## | |
# Execute the parallelized map/reduce op | |
def exec! | |
result = nil | |
exec_benchmark = Benchmark.measure do | |
count = @mappers.size | |
batch_size = @batch_size | |
interim_results = [] | |
errors = [] | |
thread_count = 0 | |
threads = [] | |
map_benchmark = Benchmark.measure do | |
@mappers.each_slice(batch_size) do |mappers| | |
thread = "thread-#{thread_count += 1}" | |
threads << Thread.new do | |
mappers.each do |mapper| | |
begin | |
interim_results << mapper.call(thread) | |
rescue StandardError => e | |
errors << e | |
ensure | |
count -= 1 | |
end | |
end | |
end | |
end | |
while count > 0 | |
sleep @check_interval | |
end | |
end | |
reduction_result = nil | |
begin | |
reduce_benchmark = Benchmark.measure do | |
reduction_result = @reducer.call(interim_results, errors) | |
end | |
rescue StandardError => e | |
raise ReductionError.new(e, interim_results, errors) | |
end | |
result = OpenStruct.new value: reduction_result, | |
errors: errors, | |
has_errors: errors.any?, | |
map_time: map_benchmark.real, | |
reduce_time: reduce_benchmark.real | |
end | |
result.total_time = exec_benchmark.real | |
result | |
end | |
class ReductionError < RuntimeError | |
attr_reader :interim_results, :mapping_errors | |
def initialize(error, interim_results, mapping_errors) | |
super("Reduction error") | |
@interim_results = interim_results | |
@mapping_errors = mapping_errors | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment