Dagger is a tool for modeling your workflows as data that can be composed together at runtime.
Dagger constructs can be integrated into a Dagger.Workflow and evaluated lazily in concurrent contexts.
Dagger Workflows are a decorated dataflow graph (a DAG - "directed acyclic graph") of your code that can model your rules, pipelines, and state machines.
Basic data flow dependencies such as in a pipeline are modeled as %Step{}
structs (nodes/vertices) in the graph with directed edges (arrows) between steps.
Steps can be thought of as a basic input -> output
lambda function.
As Facts are fed through a workflow, various steps are traversed to as needed and activated producing more Facts.
Beyond steps, Dagger has support for Rules, Joins, and State Machines for conditional, fan in/out flow, and stateful evaluation.
Dagger is targeting use cases for rule based expert systems, state machines, dynamic data pipelines, and more.
These APIs are not stable and are likely to change. Use at your own risk.
Make it work
(You are here) -> Make it Right
-> Make it beautiful
Mix.install([
{:kino, "~> 0.5.2"},
{:dagger, github: "zblanco/dagger", branch: "zw/stateful-eval"},
{:expreso, "~> 0.1.1"}
])
defmodule Kino.Mermaid do
use Kino.JS
alias Dagger.Workflow
def new(%Graph{} = graph) do
graph
|> to_mermaid()
|> new()
end
def new(%Workflow{} = wrk) do
wrk
|> workflow_to_mermaid()
|> new()
end
def new(graph) do
Kino.JS.new(__MODULE__, graph)
end
def new(graph, encoder) do
graph
|> encoder.()
|> new()
end
def workflow_to_mermaid(%Workflow{} = workflow) do
workflow.flow
|> Graph.edges()
|> Enum.reduce("graph TD;", fn edge, acc ->
v1_name = vertex_name(edge.v1)
v2_name = vertex_name(edge.v2)
v1_id = vertex_id(edge.v1)
v2_id = vertex_id(edge.v2)
acc <> "#{v1_id}([#{v1_name}])-->#{v2_id}([#{v2_name}]);"
end)
end
defp vertex_id(%Workflow.Root{}), do: :root
defp vertex_id(step), do: step.hash
defp vertex_name(vertex) do
case vertex do
%Workflow.Root{} -> "root"
%Workflow.Condition{} = c -> "cond-#{c.hash}"
%Workflow.Conjunction{} = con -> "con-#{con.hash}"
%Workflow.Step{} = s -> "step-#{s.hash}#{s.name |> String.replace("-", "")}"
step -> step.hash
end
end
defp edge_label(%{label: nil, v1: v1, v2: v2}) when is_atom(v1) and is_atom(v2),
do: "\"#{v1}-#{v2}\""
defp edge_label(%{label: {%Workflow.Root{}, hash}}), do: "root-#{hash}"
defp edge_label(%{label: {step, hash}}), do: "\"#{step}-#{hash}\""
defp edge_label(%{label: %Workflow.Root{}}), do: "root"
defp edge_label(%{label: label}), do: "\"#{label}\""
defp edge_label(%{label: {:root, hash}}), do: "\"#{:root}-#{hash}\""
defp edge_label(%{label: {step_name, hash}}), do: "\"#{step_name}-#{hash}\""
def to_mermaid(%Graph{type: :directed} = graph) do
Enum.reduce(Graph.edges(graph), "graph TD;", fn %{v1: v1, v2: v2} = edge, acc ->
v1_vertex_label = vertex_label(v1)
v2_vertex_label = vertex_label(v2)
acc <>
"#{v1_vertex_label}([#{v1_vertex_label}])--\"#{edge_label(edge)}\"-->#{v2_vertex_label}([#{v2_vertex_label}]);"
end)
end
def to_mermaid(%Graph{type: :undirected} = graph) do
Enum.reduce(Graph.edges(graph), "graph TD;", fn %{v1: v1, v2: v2} = edge, acc ->
acc <> "#{v1}([#{v1}])-- \"#{edge_label(edge)}\" ---#{v2}([#{v2}]);"
end)
end
defp vertex_label(%Workflow.Fact{hash: hash}), do: "fact#{hash}"
defp vertex_label(%Workflow.Root{}), do: :root
defp vertex_label(%{hash: hash}), do: hash
defp vertex_label(otherwise), do: otherwise
asset "main.js" do
"""
import "https://cdn.jsdelivr.net/npm/[email protected]/dist/mermaid.min.js";
mermaid.initialize({ startOnLoad: false });
export function init(ctx, graph) {
mermaid.render("graph1", graph, (svgSource, bindListeners) => {
ctx.root.innerHTML = svgSource;
bindListeners && bindListeners(ctx.root);
});
}
"""
end
end
require Dagger
import Dagger
alias Dagger.Workflow
nested_step_workflow =
workflow(
name: "a test workflow with dependent steps",
steps: [
{step(fn x -> x * x end, name: "squarifier"),
[
step(fn x -> x * -1 end, name: "negator"),
step(fn x -> x * 2 end, name: "doubler")
]},
{step(fn x -> x * 2 end, name: "doubler"),
[
{step(fn x -> x * 2 end, name: "doubler"),
[
step(fn x -> x * 2 end, name: "doubler"),
step(fn x -> x * -1 end, name: "negator")
]}
]}
]
)
nested_step_workflow
|> Kino.Mermaid.new()
defmodule TextProcessing do
def tokenize(text) do
text
|> String.downcase()
|> String.split(~R/[^[:alnum:]\-]/u, trim: true)
end
def count_words(list_of_words) do
list_of_words
|> Enum.reduce(Map.new(), fn word, map ->
Map.update(map, word, 1, &(&1 + 1))
end)
end
def count_uniques(word_count) do
Enum.count(word_count)
end
def first_word(list_of_words) do
List.first(list_of_words)
end
def last_word(list_of_words) do
List.last(list_of_words)
end
end
text_processing_pipeline =
workflow(
name: "basic text processing example",
steps: [
{step(name: "tokenize", work: &TextProcessing.tokenize/1),
[
{step(name: "count words", work: &TextProcessing.count_words/1),
[
step(name: "count unique words", work: &TextProcessing.count_uniques/1)
]},
step(name: "first word", work: &TextProcessing.first_word/1),
step(name: "last word", work: &TextProcessing.last_word/1)
]}
]
)
text_processing_pipeline
|> Kino.Mermaid.new()
text_processing_pipeline
|> Workflow.plan_eagerly("anybody want a peanut?")
|> Workflow.react_until_satisfied()
|> Workflow.raw_reactions()
text_processing_pipeline
|> Workflow.plan("anybody want a peanut?")
|> Workflow.next_runnables()
alias Dagger.Workflow.Rule
rule_from_guard_logic =
Dagger.rule(
fn
term
when (term in [:potato, "potato"] and
term != "tomato") or
(binary_part(term, 0, 4) == "pota" or
(is_atom(term) and term == :potato)) ->
"boil em mash em stick em in a stew. Po Tay Toes."
end,
name: "rule from guard"
)
rule_from_guard_logic.workflow
|> Kino.Mermaid.new()
Rule.check(rule_from_guard_logic, "potato")
Rule.run(rule_from_guard_logic, "potato")
defmodule Potatoes do
def is_potato?(:potato), do: true
def is_potato?("potato"), do: true
end
potato_rule =
rule(
condition: &Potatoes.is_potato?/1,
reaction: "it is a valid potato"
)
potato_rule.workflow
|> Kino.Mermaid.new()
rule_from_list_of_conditions =
Dagger.rule(
condition: [
# when this is true the rest are also always true (how to identify generic cases like this to reduce conditional evaluations when possible?)
fn term -> term == "potato" end,
# generic guard - this check should end up child to the root in most cases
&is_binary/1,
# stronger check than is_binary or not is_integer but still unecessary if is_potato? or anonymous variation passes
fn term -> String.length(term) == 6 end,
# weaker check that just filters out integers - lower priority - can be avoided in most cases
fn term when not is_integer(term) -> true end,
# captured function with 2 patterns
&Examples.is_potato?/1
],
reaction: "potato",
name: "rule from list of conditions"
)
rule_from_list_of_conditions.workflow
|> Kino.Mermaid.new()
composed_workflow =
Dagger.workflow(
name: "composition of rules example",
rules: [
Dagger.rule(
fn
:potato -> "potato!"
end,
name: "is it a potato?"
),
Dagger.rule(
fn item when is_integer(item) and item > 41 and item < 43 ->
Enum.random(1..10)
end,
name: "when 42 random"
)
]
)
Kino.Mermaid.new(composed_workflow)
rule_composition_workflow =
Enum.reduce(
[
rule_from_guard_logic,
potato_rule,
rule_from_list_of_conditions
],
Workflow.new("rule composition workflow"),
fn rule, wrk ->
Workflow.merge(wrk, Dagger.Flowable.to_workflow(rule))
end
)
rule_composition_workflow
|> Kino.Mermaid.new()
wrk =
Dagger.workflow(
name: "test",
steps: [
Dagger.step(fn num -> num * 2 end),
Dagger.step(fn num -> num * 3 end),
Dagger.step(fn num -> num * 4 end)
]
)
wrk =
wrk
|> Workflow.plan_eagerly(10)
|> Workflow.react()
wrk.memory
|> Kino.Mermaid.new()
some_rule =
Dagger.rule(fn item when is_integer(item) and item > 41 and item < 43 -> "fourty two" end,
name: "rule1"
)
some_rule.workflow
|> Kino.Mermaid.new()
some_rule.workflow
|> Workflow.plan_eagerly(45)
|> Map.get(:memory)
|> Kino.Mermaid.new()
some_rule.workflow
|> Workflow.plan_eagerly(42)
|> Workflow.react()
|> Map.get(:memory)
|> Kino.Mermaid.new()
Our memory structure is a graph centering on %Fact{}
vertices.
From our fact vertices we draw labeled edges to steps (in this case just hashes of the step).
Different kinds of vertices within a flow such as %Condition{}
or %Conjunction{}
have
different implementations of the Dagger.Workflow.Activation
protocol.
The activation protocol enforces a state monad / token approach where implementations always returns a new %Workflow{}
.
In the case of concurrently activated steps for a given workflow the trick is ensuring that workflows, even those who've gone through a lifecycle of reactions, can be merged.
These labeled edges represent causal relationships of interactions which have occurred so far in the workflow's execution.
Merging these edges means ensuring that if one workflow executed a step another hasn't that the labels are set to the most advanced workflow's progress.
See Forward Chaining With State Monad or Retex for more.
import Dagger
wrk =
workflow(
name: "merge test",
steps: [
{step(fn num -> num + 1 end),
[
step(fn num -> num + 2 end),
step(fn num -> num + 4 end)
]}
]
)
|> Workflow.react(2)
[{step_1, fact_1}, {step_2, fact_2} | _] = Workflow.next_runnables(wrk)
new_wrk_1 = Workflow.Activation.activate(step_1, wrk, fact_1)
new_wrk_2 = Workflow.Activation.activate(step_2, wrk, fact_2)
new_wrk_1.memory
|> Kino.Mermaid.new()
new_wrk_2.memory
|> Kino.Mermaid.new()
merged_wrk = Workflow.merge(new_wrk_1, new_wrk_2)
merged_wrk.memory
|> Kino.Mermaid.new()
- State machines (Soon
tm
) -
0.1.0
Hex release (eventually) - Sagas
- CQRS / ES / DDD models:
- Command / Event Handlers, Aggregates, Process Managers
- API for describing pure vs impure steps
- Consistent hashing, identity of functions:
- Merkles and Trees for aggregate identities
- Identity by contract where any step that conforms is ☑
- Property based tests to find the many bugs I haven't thought to test yet
- Contracts
- Pluggable evaluation / compilation?
- The "it depends" runtime bits i.e. "the fun parts"
- Dynamic Process Topologies
- Single node / Dynamic Supervisor + Registry
- Distributed workflow execution
- Oban, Broadway, Flow adapters/convenience
- UI / Builders