-
-
Save lenileiro/bdede47511134a0b06c112fbbfddc30c to your computer and use it in GitHub Desktop.
Example of how to do retryable Task.Supervisor.async_stream since it doesn't do retries for you
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require Integer | |
require Logger | |
defmodule Processor do | |
def process(state) do | |
:timer.sleep(1000) | |
result = :os.system_time(:millisecond) | |
Logger.info("process(#{state}) -> #{result}") | |
case Integer.is_even(result) do | |
true -> | |
# 10% chance of informative {:noretry, state} | |
# 20% chance of hard failure | |
# 80% chance of a {:retry, state} | |
case Enum.random(0..10) do | |
0 -> {:noretry, state} | |
1 -> Process.exit(self(), :error) | |
2 -> Process.exit(self(), :error) | |
_ -> {:retry, state} | |
end | |
_ -> | |
{:ok, {state, result}} | |
end | |
end | |
end | |
defmodule StreamHelper do | |
def async_stream(_supervisor, [], _function, result_acc, failed_acc, _) do | |
Logger.info("async_stream([]) -> done") | |
{result_acc, failed_acc} | |
end | |
def async_stream(_supervisor, enumerable, _function, result_acc, failed_acc, tries) when tries == 0 do | |
Logger.info("async_steam(#{inspect(Enum.to_list(enumerable), charlists: :as_lists)}, tries=#{tries}) no more retries") | |
{result_acc, failed_acc ++ enumerable} | |
end | |
def async_stream(supervisor, enumerable, function, result_acc, failed_acc, tries, opts \\ []) do | |
Logger.info("async_steam(#{inspect(Enum.to_list(enumerable), charlists: :as_lists)}, tries=#{tries})") | |
stream = Task.Supervisor.async_stream_nolink(supervisor, enumerable, function, opts) | |
events = Enum.to_list(stream) | |
Logger.info("async_stream -> #{inspect(events, charlists: :as_lists)}") | |
group_by = fn event -> | |
case event do | |
{:ok, {:ok, _}} -> :completed | |
{:ok, {:retry, _}} -> :retry | |
{:ok, {:noretry, _}} -> :noretry | |
{:exit, _} -> :failed | |
end | |
end | |
val_by = fn event -> | |
case event do | |
{:ok, {:ok, result}} -> result | |
{:ok, {:retry, state}} -> state | |
{:ok, {:noretry, state}} -> state | |
_ -> event | |
end | |
end | |
groups = Enum.group_by(events, group_by, val_by) | |
Logger.info("groups -> #{inspect(groups, charlists: :as_lists)}") | |
completed = Map.get(groups, :completed, []) | |
retryable = Map.get(groups, :retry, []) | |
notretryable = Map.get(groups, :noretry, []) | |
failed = Map.get(groups, :failed, []) | |
completed_states = Enum.map(result_acc ++ completed, fn {state, _} -> state end) | |
# Failed tasks ({:exit, ...}) don't return their state, so take the input states | |
failed_states = Enum.to_list(enumerable) -- (completed_states ++ retryable ++ notretryable) | |
Logger.info("completed states -> #{inspect(completed_states, charlists: :as_lists)}") | |
Logger.info("failed states -> #{inspect(failed_states, charlists: :as_lists)}") | |
Logger.info("completed -> #{inspect(completed, charlists: :as_lists)}") | |
Logger.info("retryable -> #{inspect(retryable, charlists: :as_lists)}") | |
Logger.info("not retryable -> #{inspect(notretryable, charlists: :as_lists)}") | |
Logger.info("failed -> #{inspect(failed, charlists: :as_lists)}") | |
async_stream(supervisor, retryable, function, result_acc ++ completed, failed_acc ++ notretryable ++ failed_states, tries - 1) | |
end | |
def async_stream(supervisor, enumerable, function, opts \\ []) do | |
{tries, opts} = Keyword.pop(opts, :retries, 3) | |
async_stream(supervisor, enumerable, function, [], [], tries, opts) | |
end | |
end | |
children = [{Task.Supervisor, name: MyTaskSupervisor}] | |
Supervisor.start_link(children, strategy: :one_for_one, restart: :permanent) | |
Logger.info(inspect(StreamHelper.async_stream(MyTaskSupervisor, 1..10, &Processor.process/1, [retries: 3, max_concurrency: 2]), charlists: :as_lists)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment