Created
February 15, 2016 06:10
-
-
Save yuroyoro/8b060bfaf092395eca07 to your computer and use it in GitHub Desktop.
青空文庫からテキスト形式で100本ダウンロード、zip解凍し、`OpenSSL::Cipher` でAES256bitで暗号化する処理を、ThreadとWorkerクラスでのマルチプロセス化とで比較するサンプルプログラム
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'openssl' | |
require 'base64' | |
require './worker.rb' | |
$urls = %w( | |
http://www.aozora.gr.jp/cards/001779/files/56647_ruby_58166.zip | |
http://www.aozora.gr.jp/cards/000148/files/752_ruby_2438.zip | |
http://www.aozora.gr.jp/cards/001383/files/56866_ruby_58168.zip | |
http://www.aozora.gr.jp/cards/000148/files/789_ruby_5639.zip | |
http://www.aozora.gr.jp/cards/000035/files/301_ruby_5915.zip | |
http://www.aozora.gr.jp/cards/000096/files/2093_ruby_28087.zip | |
http://www.aozora.gr.jp/cards/000081/files/456_ruby_145.zip | |
http://www.aozora.gr.jp/cards/000879/files/127_ruby_150.zip | |
http://www.aozora.gr.jp/cards/001562/files/52409_ruby_51058.zip | |
http://www.aozora.gr.jp/cards/001562/files/52410_ruby_51060.zip | |
http://www.aozora.gr.jp/cards/000156/files/1465_ruby_16804.zip | |
http://www.aozora.gr.jp/cards/000035/files/1565_ruby_8220.zip | |
http://www.aozora.gr.jp/cards/000879/files/92_ruby_164.zip | |
http://www.aozora.gr.jp/cards/000035/files/1567_ruby_4948.zip | |
http://www.aozora.gr.jp/cards/000148/files/794_ruby_4237.zip | |
http://www.aozora.gr.jp/cards/001566/files/52504_ruby_49666.zip | |
http://www.aozora.gr.jp/cards/000296/files/47061_ruby_28378.zip | |
http://www.aozora.gr.jp/cards/001235/files/49866_ruby_41853.zip | |
http://www.aozora.gr.jp/cards/001562/files/56758_ruby_57843.zip | |
http://www.aozora.gr.jp/cards/000074/files/424_ruby_19825.zip | |
http://www.aozora.gr.jp/cards/000052/files/5016_ruby_9746.zip | |
).cycle(5).to_a | |
$password = 'password' | |
$salt = OpenSSL::Random.random_bytes(8) | |
def crawl(url) | |
puts "pid #{Process.pid} : crawling #{url}" | |
IO.popen("curl -s #{url} | funzip -c", external_encoding: "SJIS").read | |
end | |
def encrypt(url, text, times) | |
puts "pid #{Process.pid} : encrypting #{url} #{times} times" | |
result = nil | |
times.times do | |
cipher = OpenSSL::Cipher::Cipher.new("AES-256-CBC") | |
cipher.encrypt | |
cipher.pkcs5_keyivgen($password, $salt) | |
result = cipher.update(text) + cipher.final | |
end | |
result | |
end | |
def crawl_and_encrypt(times, *urls) | |
urls.map{|url| | |
text = crawl(url) | |
Base64.encode64(encrypt(url, text, times)) | |
} | |
end | |
def run_parallelly(num, times) | |
workers = num.times.map{ | |
Worker.new{|times, *urls| | |
crawl_and_encrypt(times, *urls) | |
} | |
} | |
puts "run parallelly #{num} processes" | |
workers.map(&:run).each(&:join) | |
threads = $urls.zip(workers.cycle).group_by(&:last).map{|worker, args| | |
args = args.map(&:first).unshift(times) | |
puts "start worker pid #{worker.pid}" | |
worker.execute(*args) | |
} | |
threads.each(&:join) | |
workers.each(&:stop) | |
results = threads.flat_map(&:value) | |
end | |
def run_concurrently(num, times) | |
puts "run concurrently #{num} threads" | |
threads = $urls.zip(num.times.cycle).group_by(&:last).flat_map{|_, args| | |
Thread.new { | |
urls = args.map(&:first) | |
crawl_and_encrypt(times, *urls) | |
} | |
} | |
threads.each(&:join) | |
results = threads.flat_map(&:value) | |
end | |
def run_serially(times) | |
puts "run serially" | |
crawl_and_encrypt(times, *$urls) | |
end | |
mode, times, num = ARGV.to_a | |
results = case mode | |
when "parallel" then run_parallelly(num.to_i, times.to_i) | |
when "concurrent" then run_concurrently(num.to_i, times.to_i) | |
when "serial" then run_serially(times.to_i) | |
end | |
puts "encrypted total size: #{results.map(&:bytesize).inject(&:+)} bytes" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 子プロセスを管理するWorkerクラス | |
class Worker | |
attr_reader :pid | |
def initialize(&block) | |
@child_read, @parent_write = create_pipe # 親から子へのpipe | |
@parent_read, @child_write = create_pipe # 子から親へのpipe | |
@block = block # forkして実行する処理 | |
end | |
def create_pipe | |
# Marshal.dumpの結果はASCII-8BITなのでpipeのエンコーディングもあわせる | |
IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } } | |
end | |
# 子プロセスの起動処理 | |
def run | |
@pid = fork do # forkする | |
# 子で使わないpipeは閉じる | |
@parent_read.close | |
@parent_write.close | |
# 親プロセスに起動終了を伝える | |
write_to_parent(:ready) | |
loop do | |
# 親からの依頼待ち | |
args = read_from_parent | |
# stopが飛んで来たらloopを抜けて子プロセスを終了させる | |
break if args == :stop | |
# 処理を実行する | |
result = @block.call(*args) | |
# 結果をpipeに書き込んで完了を親に伝える | |
write_object(result, @child_write) | |
end | |
@child_read.close | |
@child_write.close | |
end | |
wait_after_fork if @pid | |
end | |
# 子プロセスに処理を行わせる | |
def execute(*msg) | |
write_to_child(msg) | |
Thread.new { read_from_child } # Threadを起こして子からpipeに書き込まれるのを待つ | |
end | |
def stop | |
return unless alive? | |
# 子を終了させる | |
write_to_child(:stop) | |
# waitpidで子プロセスを回収する | |
Process.wait(@pid) | |
end | |
def alive? | |
Process.kill(0, @pid) | |
true | |
rescue Errno::ESRCH | |
false | |
end | |
def write_object(obj, write) | |
# RubyオブジェクトをMarshalしてpipeに書き込む | |
# 改行をデリミタにする | |
data = Marshal.dump(obj).gsub("\n", '\n') + "\n" | |
write.write data | |
end | |
def read_object(read) | |
# pipeから読み込んだデータをRubyオブジェクトに復元する | |
data = read.gets | |
Marshal.load(data.chomp.gsub('\n', "\n")) | |
end | |
def read_from_child | |
read_object(@parent_read) | |
end | |
def write_to_child(obj) | |
write_object(obj, @parent_write) | |
end | |
def read_from_parent | |
read_object(@child_read) | |
end | |
def write_to_parent(obj) | |
write_object(obj, @child_write) | |
end | |
def wait_after_fork | |
@child_read.close | |
@child_write.close | |
install_exit_handler | |
install_signal_handler | |
# 子から起動完了が通知されるまで待つ | |
Thread.new { | |
result = read_from_child | |
raise "Failed to start worker pid #{ @pid }" unless result == :ready | |
result | |
} | |
end | |
def install_exit_handler | |
# Kernel#at_exitで子を回収 | |
at_exit do | |
next unless alive? | |
begin | |
Process.kill("KILL", @pid) | |
Process.wait(@pid) | |
rescue Errno::ESRCH | |
# noop | |
rescue => e | |
puts "error at_exit: #{ e }" | |
raise e | |
end | |
end | |
end | |
def install_signal_handler | |
# 親のSIGINT, SIGQUITは子プロセスにも転送する | |
[:INT, :QUIT].each do |signal| | |
old_handler = Signal.trap(signal) { | |
Process.kill(signal, @pid) | |
Process.wait(@pid) | |
old_handler.call | |
} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment