|
defmodule DistributedImageProcessing do |
|
def distribute(workers, images) do |
|
Enum.each(workers, &Node.connect/1) |
|
|
|
worker_stream = |
|
Stream.repeatedly(fn -> workers end) # worker_stream is generated repeatedly |
|
|> Stream.flat_map(& &1) |
|
|
|
sender_pid = self() |
|
|
|
Stream.zip(worker_stream, images) |
|
|> Flow.from_enumerable() |
|
|> Flow.map(fn {worker, image} -> |
|
IO.puts("enter spawn_link") |
|
{ |
|
Node.spawn_link(worker, fn -> |
|
receive do # worker receives an image from main |
|
{:img, sender_pid, img} -> |
|
{dst_file, img} = process_image(img) # call process_image |
|
binary = Nx.to_binary(img) # An image should be converted into binary, shape and type before sending. |
|
shape = Nx.shape(img) |
|
type = Nx.type(img) |
|
send(sender_pid, {dst_file, type, shape, binary}) |
|
IO.puts("respond") |
|
end |
|
end), |
|
image |
|
} |
|
end) |
|
|> Flow.map(fn {pid, src_file} -> |
|
IO.puts("enter reader") |
|
img = Evision.imread!(src_file) |> Evision.Nx.to_nx!() |
|
binary = Nx.to_binary(img) # An image should be converted into binary, shape and type before sending. |
|
shape = Nx.shape(img) |
|
type = Nx.type(img) |
|
send(pid, {:img, sender_pid, {src_file, type, shape, binary}}) |
|
end) |
|
|> Enum.to_list() |
|
|> Enum.map(fn _ -> |
|
IO.puts("enter receiver") |
|
receive do |
|
{dst_file, type, shape, binary} -> |
|
img = Nx.from_binary(binary, type) |> Nx.reshape(shape) |> Evision.Nx.to_mat!() # reconstruction of an image |
|
Evision.imwrite!(dst_file, img) |
|
end |
|
end) |
|
|> Enum.to_list() |
|
end |
|
|
|
def process_image({src_file, type, shape, binary}) do |
|
IO.puts("enter processor") |
|
|
|
# file name conversion |
|
src_file_ext = Path.extname(src_file) |
|
src_file_basename = Path.basename(src_file, src_file_ext) |
|
dst_file = "#{src_file_basename}_d#{src_file_ext}" |
|
|
|
dst_img = |
|
Nx.from_binary(binary, type) # reconstruction of an image |
|
|> Nx.reshape(shape) |
|
|> Evision.Nx.to_mat!() |
|
|> Evision.threshold!(127, 255, Evision.cv_THRESH_BINARY) # image processing |
|
|> elem(1) |
|
|> Evision.Nx.to_nx!() |
|
|
|
{dst_file, dst_img} |
|
end |
|
end |