What do Tensorflow, Apache Airflow, Rule Engines, and Excel have in common?
Under the hood they all use DAGs to model data-flow dependencies of the program. Using graphs to model programs is great because you can modify the program at runtime. Lets talk about doing this in Elixir for great good.
Graph data structures have many use cases in computing. We'll focus on using Directed Acyclic Graphs (DAGs) for forward chaining dataflow models.
This notebook illustrates how to use dataflow graphs in Elixir tackle tricky problems like expert systems / rule engines, dynamic data pipelines and more.
A graph is
A graph data structure consists of a finite (and possibly mutable) set of vertices (also called nodes or points), together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph. These pairs are known as edges (also called links or lines), and for a directed graph are also known as edges but also sometimes arrows or arcs.
A graph is circles and lines. Nodes and connections. Vertices and edges.
DAG stands for Directed Acyclic Graph
i.e. Circles with lines that have an arrow between them.
The "acyclic" means no "cycles" i.e. there cannot be a "loop back" where an arrow takes you back up the graph where you've already been.
- Apache Airflow and similar ETL / Pipeline builder tools are DAGs
- Apache BEAM and most related high throughput data processing frameworks are DAGs
- Flink is a leading data processing framework that implements the Apache BEAM standard.
- Apache Spark is also dataflow dags
- GCP Dataflow
- AWS Data Pipeline
- Tensorflow is graphs
- Serverless / Workflow systems are DAGs
- https://blog.acolyer.org/2020/02/07/cloudburst/
- AWS Lambda
- Google Cloud Functions
- Azure Functions
- Rule & Workflow engines are DAGs
- CPU scheduling is DAGs!
The common use case for DAGs in dataflow models is to represent parts of your program as data where it can be transorfmed and composed at runtime.
There are other ways we can represent a program at runtime like with a list of functions.
The advantage of modeling the program as data is it allows for deferred execution. The caller can choose how and when to execute the function(s).
"Using a datastructure to hold a program/instructions for later use"
funs = [
fn num -> num + 1 end,
fn num -> num * 2 end,
fn num -> num - 1 end
]
"Using that program-as-data to do stuff at runtime"
Enum.map(funs, & &1.(2))
funs
|> Enum.map(& &1.(2))
|> Enum.sum()
Mix.install([
{:kino, "~> 0.5.2"},
{:libgraph, github: "bitwalker/libgraph", branch: "main"},
{:req, "~> 0.2.0"},
{:jason, "~> 1.1.0"},
{:floki, "~> 0.32.0"}
])
defmodule Kino.Mermaid do
use Kino.JS
def new(%Graph{} = graph) do
graph
|> to_mermaid()
|> new()
end
def new(graph) do
Kino.JS.new(__MODULE__, graph)
end
def to_mermaid(%Graph{} = graph) do
Enum.reduce(
Graph.edges(graph),
"graph TD;",
fn
%{v1: v1, v2: v2}, acc ->
acc <>
"#{vertex_id(v1)}([#{vertex_name(v1)}])-->#{vertex_id(v2)}([#{vertex_name(v2)}]);"
end
)
end
defp vertex_name(:root), do: :root
defp vertex_name(%{work: fun}), do: "step_#{fun_name(fun)}"
defp vertex_name(%{check: fun}), do: "cond_#{fun_name(fun)}"
defp vertex_name(fun) when is_function(fun), do: "fun_#{fun_name(fun)}-#{:erlang.phash2(fun)}"
defp vertex_name(vertex) when is_atom(vertex), do: to_string(vertex)
defp vertex_name(otherwise) do
try do
to_string(otherwise)
catch
_any -> :erlang.phash2(otherwise)
end
end
defp vertex_id(thing) when is_atom(thing), do: to_string(thing)
defp vertex_id(thing), do: :erlang.phash2(thing)
defp fun_name(fun), do: Function.info(fun, :name) |> elem(1)
asset "main.js" do
"""
import "https://cdn.jsdelivr.net/npm/[email protected]/dist/mermaid.min.js";
mermaid.initialize({ startOnLoad: false });
export function init(ctx, graph) {
mermaid.render("graph1", graph, (svgSource, bindListeners) => {
ctx.root.innerHTML = svgSource;
bindListeners && bindListeners(ctx.root);
});
}
"""
end
end
g =
Graph.new(type: :directed)
|> Graph.add_edges([
{:a, :b},
{:a, :c},
{:b, :d}
])
g
|> Kino.Mermaid.new()
If functions are just data and graphs are data structures - what if we put functions in our graphs?
fun_1 = fn num -> num * 2 end
fun_2 = fn num -> num + 2 end
fun_3 = fn num -> num * 42 end
fun_4 = fn num -> num + 42 end
fun_dag =
Graph.new(type: :directed)
|> Graph.add_edges([
{:root, fun_1},
{fun_1, fun_2},
{fun_1, fun_3},
{fun_2, fun_4}
])
fun_dag
|> Kino.Mermaid.new()
Now that we've got lambda functions in our dag - how to we evaluate it?
- start at the top and run that function with the input to get a new output
- find the next step(s) by traveling down the arrows (i.e. the
out_neighbors
) - feed the output from the previous step into the next steps
- Profit
TLDR; "Travel down the arrows"
fun_dag_input = Kino.Input.number("Num")
input = Kino.Input.read(fun_dag_input)
neighbors_of_root = Graph.out_neighbors(fun_dag, :root)
first_runnables = Enum.map(neighbors_of_root, fn fun -> {fun, input} end)
first_result =
first_runnables
|> Enum.map(fn {fun, input} -> fun.(input) end)
|> List.first()
fun_from_first_runnable = first_runnables |> List.first() |> elem(0)
neighbors_of_first_runnable =
fun_dag
|> Graph.out_neighbors(fun_from_first_runnable)
|> Enum.map(fn fun -> {fun, first_result} end)
|> Enum.map(fn {fun, input} ->
fun.(input)
end)
neighbors_of_first_runnable =
fun_dag
|> Graph.out_neighbors(fun_from_first_runnable)
|> Task.async_stream(fn fun
fun.(first_result)
end)
|> Enum.map(fn {:ok, result} -> result end)
fun_dag
|> Kino.Mermaid.new()
new_fun_dag = Graph.add_edge(fun_dag, fun_from_first_runnable, fn _num -> 42 end)
new_fun_dag
|> Kino.Mermaid.new()
What other features would a DAG / Pipeline buider tool like this want?
Steps with more than 1 parent dependency.
Steps with constraints controlling when to execute.
Accumulating data to act on or control behavior.
join_example =
Graph.new()
|> Graph.add_edges([
{:root, :step_a},
{:root, :step_b},
{:step_a, :step_c},
{:step_b, :step_c}
])
join_example
|> Kino.Mermaid.new()
Here we can only execute step_c
when both step_a
and step_b
have resolved.
This means we have to keep intermediate results somewhere in memory so we can grab the result of either a or b and use both to invoke step c.
A rule is a step with a constraint on when to execute it.
A function is kind of like a rule where fn :potato -> 42 end
will only return 42
when given :potato
.
The usual verbiage is the lhs
and rhs
or condition
and reaction
.
Rules engines are the OG AI solving business problems since the 60's.
Rules are common abstractions for the representation of expert knowledge.
They're handy because they're easily composed, evaluated, stored, and organized.
Say we want to make a graph with our example rule from before:
fn :potato -> 42 end
We know we need to evaluate the lhs
before the rhs
and only evaluate the rhs
if the lhs
returns true
.
We can do this by breaking out our one rule into the two pieces:
lhs = fn :potato -> true end
rhs = fn _any -> 42 end
Now we need to get these into our DAG and since the lhs
has to run first - we'll make sure the arrow goes lhs -> rhs
.
potato_rule =
Graph.new()
|> Graph.add_edges([
{:is_potato?, 42}
])
potato_rule
|> Kino.Mermaid.new()
composed_rule =
Graph.new()
|> Graph.add_edges([
{:root, :is_potato?},
{:is_potato?, :boil_em_mash_em},
{:root, :is_ham?},
{:is_ham?, :it_is_delicious},
{:root, :is_beet?},
{:is_beet?, :good_in_salad}
])
composed_rule
|> Kino.Mermaid.new()
Now we could give our our Graph of Rules some inputs and get results like
> :ham
:it_is_delicious
> :potato
boil_em_mash_em
> :beet
:good_in_salad
For Elixir and Erlang function calls - we're scoped to that module, function, and arity when evaluating. This has advantages because it means we can evaluate patterns top to bottom executing the RHS of whichever pattern matches first.
In these cases - outside of optimizing your pattern matches you don't really need or want conditional expansion.
However in the case of the evaluation of many rules where the conditions might overlap - its often useful to expand the expressions within the LHS into smaller conditions bound together in the graph.
Here's an example of the initial approach of expanding just the LHS and RHS.
lhs = fn input when (input == :potato and input != :ham) or input == "potato" -> true end
rhs = fn _any -> f2 end
But if we also bring in another rule such as
other_rule = fn :potato -> 42 end
We might be matching against :potato
twice for the same input and do unecessary work.
So if we're composing many rules with like conditions for the same execution context we can do this by expanding each individual expression as its own condition.
fn input
when (input == :potato and input != :ham) or
input == "potato" ->
42
end
# full expansion
lhs_1 = fn input when input == :potato -> true end
lhs_1 = fn input when input != :ham -> true end
lhs_or = fn input == "potato" -> true end
expanded_rule =
Graph.new()
|> Graph.add_edges([
{:root, :lhs_1},
{:root, :lhs_2},
{:root, :other_path},
{:lhs_1, :conjunction_2_and_1},
{:lhs_2, :conjunction_2_and_1},
{:conjunction_2_and_1, :rhs},
{:other_path, :rhs}
])
expanded_rule
|> Kino.Mermaid.new()
Joining ongoing expressions is stateful - sort of.
To ensure our step dependencies are resolved we have to hold its dependents in memory and wait until the final step can be executed.
A DAG of Lambda functions are easy to evaluate because they're stateless meaning our scheduler can blindly parallel map over steps keeping only the 1 step in context.
potato_lock =
Dagger.state_machine(
init: %{code: "potato", state: :locked, contents: "ham"},
reducer: fn
:lock, state ->
%{state | state: :locked}
{:unlock, input_code}, %{code: code, state: :locked} = state when input_code == code ->
%{state | state: :unlocked}
_input_code, %{state: :unlocked} ->
state
end,
reactors: [
fn %{state: :unlocked, contents: contents} -> contents end,
fn %{state: :locked} -> {:error, :locked} end
]
)
lock_state_machine =
Graph.new()
|> Graph.add_edges([
{:root, :arity_check_1},
{:arity_check_1, :is_lock_command?},
{:arity_check_1, :is_unlock_command?},
{:arity_check_1, :is_code_correct?},
{:is_lock_command?, :lock_conjunction},
{:is_unlock_command?, :lock_conjunction},
{:is_code_correct?, :lock_conjunction},
{:lock_conjunction, :lock_reducer},
{:root, :reactor_lhs_clause_1_matched?},
{:root, :reactor_lhs_clause_2_matched?},
{:reactor_lhs_clause_1_matched?, :reactor_rhs_1},
{:reactor_lhs_clause_2_matched?, :reactor_rhs_2}
])
Kino.Mermaid.new(lock_state_machine)
defmodule TextProcessing do
def tokenize(text) do
text
|> String.downcase()
|> String.split(~R/[^[:alnum:]\-]/u, trim: true)
end
def count_words(list_of_words) do
list_of_words
|> Enum.reduce(Map.new(), fn word, map ->
Map.update(map, word, 1, &(&1 + 1))
end)
end
def count_uniques(word_count) do
Enum.count(word_count)
end
def first_word(list_of_words) do
List.first(list_of_words)
end
def last_word(list_of_words) do
List.last(list_of_words)
end
end
text = "boil em mash em stick em in a stew. Po Tay Toes."
text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
|> TextProcessing.count_uniques()
text
|> TextProcessing.tokenize()
|> TextProcessing.count_words()
text
|> TextProcessing.tokenize()
|> TextProcessing.first_word()
text
|> TextProcessing.tokenize()
|> TextProcessing.last_word()
We tokenized 3 times and counted words twice!
What if we made a Pipeline
module so we can pipeline while we pipeline?
defmodule Fact do
defstruct ~w(
value
runnable
)a
end
defmodule Step do
defstruct ~w(
work
)a
def new(work), do: %__MODULE__{work: work}
def run(%__MODULE__{work: work} = step, %Fact{value: input} = input_fact) do
%Fact{value: apply(work, [input]), runnable: {step, input_fact}}
end
def run(%__MODULE__{work: work} = step, input) do
%Fact{value: apply(work, [input]), runnable: {step, %Fact{value: input}}}
end
end
defmodule Rule do
defstruct ~w(
lhs
rhs
)a
def new(lhs, rhs) do
%__MODULE__{lhs: lhs, rhs: rhs}
end
def check(%__MODULE__{} = rule, input) do
apply(rule.lhs, [input])
end
def run(%__MODULE__{} = rule, input) do
if check(rule, input) do
apply(rule.rhs, input)
end
end
def to_pipeline(%__MODULE__{} = rule) do
Pipeline.new()
|> Pipeline.add_step(Condition.new(rule.lhs), Step.new(rule.rhs))
end
end
defmodule Condition do
defstruct ~w(
check
)a
def new(check), do: %__MODULE__{check: check}
def run(%Condition{} = condition, %Fact{} = fact) do
%Fact{value: run(condition.check, fact.value)}
end
def run(condition, input) do
with true <- apply(condition.check, [input]) do
true
else
_otherise ->
false
end
end
end
defmodule Pipeline do
defstruct ~w(
flow
facts
)a
def new() do
%__MODULE__{
flow: Graph.new(type: :directed) |> Graph.add_vertex(:root),
facts: []
}
end
def run({%Step{} = step, %Fact{} = fact} = _runnable) do
Step.run(step, fact)
end
def add_step(%__MODULE__{flow: flow} = pipeline, step) do
%__MODULE__{pipeline | flow: Graph.add_edge(flow, :root, step)}
end
def add_step(%__MODULE__{} = pipeline, fun) when is_function(fun) do
add_step(pipeline, Step.new(fun))
end
def add_step(%__MODULE__{flow: flow} = pipeline, parent_step, child_step) do
if Graph.has_vertex?(flow, parent_step) do
%__MODULE__{pipeline | flow: Graph.add_edge(flow, parent_step, child_step)}
else
pipeline
|> add_step(parent_step)
|> add_step(parent_step, child_step)
end
end
@doc """
Merges the second pipeline into the first.
"""
def merge(%__MODULE__{} = pipeline_1, %__MODULE__{} = pipeline_2) do
new_flow =
Enum.reduce(Graph.edges(pipeline_2.flow), pipeline_1.flow, fn edge, flow ->
Graph.add_edge(flow, edge)
end)
%__MODULE__{pipeline_1 | flow: new_flow}
end
def next_steps(%__MODULE__{flow: flow}, step) do
Graph.out_neighbors(flow, step)
end
def next_runnables(
%__MODULE__{} = pipeline,
%Fact{runnable: {previous_step, _previous_input}} = fact
) do
pipeline
|> next_steps(previous_step)
|> Enum.map(fn step ->
{step, fact}
end)
end
@doc """
Returns a list of runnables (`work, input` pairs).
"""
def next_runnables(%__MODULE__{} = pipeline, some_input) do
pipeline
|> next_steps(:root)
|> Enum.map(fn step ->
{step, %Fact{value: some_input}}
end)
end
end
tokenize_step = Step.new(&TextProcessing.tokenize/1)
count_words_step = Step.new(&TextProcessing.count_words/1)
count_uniques_step = Step.new(&TextProcessing.count_uniques/1)
first_word_step = Step.new(&TextProcessing.first_word/1)
last_word_step = Step.new(&TextProcessing.last_word/1)
text_processing_pipeline =
Pipeline.new()
|> Pipeline.add_step(tokenize_step)
|> Pipeline.add_step(tokenize_step, count_words_step)
|> Pipeline.add_step(tokenize_step, first_word_step)
|> Pipeline.add_step(tokenize_step, last_word_step)
|> Pipeline.add_step(count_words_step, count_uniques_step)
text_processing_pipeline.flow
|> Kino.Mermaid.new()
next_runnables = Pipeline.next_runnables(text_processing_pipeline, text)
results_1 =
next_runnables
|> Enum.map(fn runnable ->
Pipeline.run(runnable)
end)
next_runnables =
results_1
|> Enum.flat_map(fn %Fact{} = fact ->
Pipeline.next_runnables(text_processing_pipeline, fact)
end)
results_2 =
next_runnables
|> Enum.map(&Pipeline.run/1)
- For AI, rule based systems, knowledge representation and more: The late, great Patrick Winston's MIT AI lectures
- For understanding programming paradigms, concurrency, dataflow, backwards chaining, and more: Peter Van Roy's Youtube Channel