Created
August 22, 2014 02:34
-
-
Save jgautsch/cf1d82bd8f348c121640 to your computer and use it in GitHub Desktop.
the Map script for the NPI dataset MapReduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ########################################################### | |
# Configuration | |
# ########################################################### | |
# Load all Gem and ENV dependencies | |
require 'rubygems' | |
require 'bundler/setup' | |
require 'dotenv' | |
Dotenv.load | |
require 'require_all' | |
require 'mongoid' | |
require 'geocoder' | |
require 'hashie' | |
require 'csv' | |
require 'thread' | |
require 'active_support/core_ext/integer/time' | |
require 'active_support/core_ext/numeric/time' | |
# Load Mongoid config | |
Mongoid.load!("#{File.dirname(__FILE__)}/mongoid.yml", :production) | |
# Load all classes | |
require "#{File.dirname(__FILE__)}/classes.rb" | |
# Configure Geocoder | |
Geocoder.configure( | |
lookup: :dstk, | |
host: "http://dstk-lb-1122339968.us-east-1.elb.amazonaws.com/", | |
timeout: 2 | |
) | |
# ########################################################### | |
# Map Script | |
# ########################################################### | |
taxonomies = DataProcessing::TaxonomyCache.new | |
input = Queue.new | |
threads = [] | |
num_threads = 10 | |
@pushing_finished = false | |
@threads_processing = Array.new(num_threads) { false } | |
# Producer Thread | |
# Read from the STDIN and add to a buffered input queue (glorified array) | |
threads << Thread.new(input) do |ip| | |
puts "Starting producer loop" | |
loop do | |
unless ip.size > 100 | |
if line = STDIN.gets | |
ip.push(line) | |
else | |
@pushing_finished = true | |
break | |
end | |
end | |
end | |
end | |
# Consumer Threads | |
num_threads.times do |i| | |
# Handle the input passed by the reader thread | |
threads << Thread.new(input) do |ip| | |
puts "Starting consumer loop #{i}" | |
loop do | |
if ip.size == 0 and @pushing_finished and !@threads_processing.reduce { |e,r| e && r } | |
sleep(3) | |
Thread.exit | |
else | |
unless ip.empty? | |
@threads_processing[i] = true | |
row = ip.pop | |
row_object = DataProcessing::Stream::RowObjectBuilder.new(row, taxonomies) | |
row_object.objectify | |
@threads_processing[i] = false | |
end | |
end | |
end | |
end | |
end | |
threads.map(&:join) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment