-
-
Save jaeyson/3ae502c9f52a1706fcfa721bfa4d976b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Task do | |
defp await_one(tasks, timeout \\ 5_000) when is_list(tasks) do | |
awaiting = | |
Map.new(tasks, fn %Task{ref: ref, owner: owner} = task -> | |
if owner != self() do | |
raise ArgumentError, invalid_owner_error(task) | |
end | |
{ref, true} | |
end) | |
timeout_ref = make_ref() | |
timer_ref = | |
if timeout != :infinity do | |
Process.send_after(self(), timeout_ref, timeout) | |
end | |
try do | |
await_one(tasks, timeout, awaiting, timeout_ref) | |
after | |
timer_ref && Process.cancel_timer(timer_ref) | |
receive do: (^timeout_ref -> :ok), after: (0 -> :ok) | |
end | |
end | |
defp await_one(_tasks, _timeout, awaiting, _timeout_ref) when map_size(awaiting) == 0 do | |
nil | |
end | |
defp await_one(tasks, timeout, awaiting, timeout_ref) do | |
receive do | |
^timeout_ref -> | |
demonitor_pending_tasks(awaiting) | |
exit({:timeout, {__MODULE__, :await_one, [tasks, timeout]}}) | |
{:DOWN, ref, _, proc, reason} when is_map_key(awaiting, ref) -> | |
demonitor_pending_tasks(awaiting) | |
exit({reason(reason, proc), {__MODULE__, :await_many, [tasks, timeout]}}) | |
{ref, nil} when is_map_key(awaiting, ref) -> | |
demonitor(ref) | |
await_one(tasks, timeout, Map.delete(awaiting, ref), timeout_ref) | |
{ref, reply} when is_map_key(awaiting, ref) -> | |
awaiting = Map.delete(awaiting, ref) | |
demonitor_pending_tasks(awaiting) | |
reply | |
end | |
end | |
defp demonitor_pending_tasks(awaiting) do | |
Enum.each(awaiting, fn {ref, _} -> | |
demonitor(ref) | |
end) | |
end | |
defp reason(:noconnection, proc), do: {:nodedown, monitor_node(proc)} | |
defp reason(reason, _), do: reason | |
defp monitor_node(pid) when is_pid(pid), do: node(pid) | |
defp monitor_node({_, node}), do: node | |
defp demonitor(ref) when is_reference(ref) do | |
Process.demonitor(ref, [:flush]) | |
:ok | |
end | |
defp invalid_owner_error(task) do | |
"task #{inspect(task)} must be queried from the owner but was queried from #{inspect(self())}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment