Skip to content

Instantly share code, notes, and snippets.

@zblanco
Created April 24, 2025 15:40
Show Gist options
  • Save zblanco/9b75c35fe350674d7583a2760b292e3b to your computer and use it in GitHub Desktop.
Save zblanco/9b75c35fe350674d7583a2760b292e3b to your computer and use it in GitHub Desktop.
Knowledge Graph Generation Workflow using Elixir & Runic

Building AI workflows with Runic

Mix.install([
  {:kino, "~> 0.15.3"},
  {:runic, github: "zblanco/runic", branch: "zw/map"},
  {:req, "~> 0.5.10"},
  {:server_sent_events, "~> 0.2.1"},
  {:ecto, "~> 3.12"}
])

Configure the LLM

llm_api_url_input = Kino.Input.text("LLM API URL", default: "https://api.groq.com/openai/v1/chat/completions")
model_input = Kino.Input.text("Model", default: "llama-3.1-8b-instant")
max_tokens_input = Kino.Input.text("Max Tokens", default: 512)
llm_api_key_input = Kino.Input.password("LLM API Key", default: System.get_env("GROQ_API_KEY"))
llm_api_url = Kino.Input.read(llm_api_url_input)
model = Kino.Input.read(model_input)

max_tokens = Kino.Input.read(max_tokens_input)
llm_api_key = Kino.Input.read(llm_api_key_input)
:ok 
defmodule LLMTools do
  @chat_completion_opts ~w(
    model
    messages
    frequency_penalty
    max_completion_tokens
    n
    parallel_tool_calls
    presence_penalty
    reasoning_format
    response_format
    seed
    service_tier
    stop
    stream
    temperature
    tool_choice
    tools
    top_logprobs
    top_p
    user
  )a

  # @default_system_prompt """
  # You are a fiercely intelligent assistant and friend of the user.
  
  # Your purpose and drive is to assist the user with any request they have.
  
  # You provide helpful measured responses that take the user seriously.
  # You provide precise and clear descriptions using helpful analogies
  # and other communication techniques to convey the core models of a subject.
  # """
  def prompt_chat_completion(message, opts \\ [])
  
  def prompt_chat_completion(message, opts) when is_binary(message) do
    prompt_chat_completion([%{role: :user, content: message}], opts)
  end

  def prompt_chat_completion([%{role: _, content: _} | _] = messages, opts) do
    opts 
    |> Keyword.put(:messages, messages)
    |> generate_response()
    |> extract_llm_response()
  end
  
  def generate_response!(opts \\ []) do
    api_key = opts[:api_key] || raise(ArgumentError, "`api_key` is required")
    url = opts[:url] || raise(ArgumentError, "`url` is required")
    opts[:messages] || raise(ArgumentError, "`messages` is required")
    opts[:model] || raise(ArgumentError, "`model` is required")
    
    opts = Keyword.take(opts, @chat_completion_opts)
    
    Req.post!(url,
      json: Map.new(opts),
      auth: {:bearer, api_key}
    )
  end

  def generate_response(opts \\ []) do
    api_key = opts[:api_key] || raise(ArgumentError, "`api_key` is required")
    url = opts[:url] || raise(ArgumentError, "`url` is required")
    opts[:messages] || raise(ArgumentError, "`messages` is required")
    opts[:model] || raise(ArgumentError, "`model` is required")
    
    opts = Keyword.take(opts, @chat_completion_opts)
    
    Req.post(url,
      json: Map.new(opts),
      auth: {:bearer, api_key},
      receive_timeout: 60_000,
      pool_timeout: 30_000
    )
  end

  def extract_llm_response({:ok, response}) do
    extract_llm_response(response)
  end

  def extract_llm_response(%Req.Response{status: 200, body: body}) do
    body
    |> Map.get("choices")
    |> hd()
    |> Map.get("message")
    |> Map.get("content")
  end

  def stream_response!(opts \\ []) do
    timeout = opts[:timeout] || 60_000
    api_key = opts[:api_key] || raise(ArgumentError, "`api_key` is required")
    url = opts[:url] || raise(ArgumentError, "`url` is required")
    opts[:messages] || raise(ArgumentError, "`messages` is required")
    opts[:model] || raise(ArgumentError, "`model` is required")
    opts = Keyword.take(opts, @chat_completion_opts)
    pid = self()

    Stream.resource(
      fn ->
        Task.async(fn ->
          options =
            [
              json: Map.merge(%{stream: true}, Map.new(opts)),
              receive_timeout: timeout,
              auth: {:bearer, api_key},
              into: fn {:data, data}, {req, resp} ->
                buffer = Req.Request.get_private(req, :sse_buffer, "")
                {events, buffer} = ServerSentEvents.parse(buffer <> data)
                Req.Request.put_private(req, :sse_buffer, buffer)

                if events != [] do
                  send(pid, events)
                end

                {:cont, {req, resp}}

              end
            ]

          Req.post(url, options)
          send(pid, :done)
        end)
      end,
      fn task ->
        receive do
          :done ->
            {:halt, task}

          data ->
            {[data], task}
        after
          15_000 ->
            {:halt, task}
        end
      end,
      fn task -> Task.await(task, 15_000) end
    )
  end
end

Graph Visualization

Cytoscape JS docs

defmodule Kino.Cytoscape do
  use Kino.JS

  def new(g, options \\ [])

  def new(%Graph{} = graph, options) do
    graph
    |> to_edgelist_json()
    |> new(options(options))
  end

  def new(edge_list, options) when is_list(options) do
    nodes =
      edge_list
      |> Enum.flat_map(fn %{data: edge} ->
        [
          %{data: %{id: "#{edge.source}", name: "#{edge.source}"}},
          %{data: %{id: "#{edge.target}", name: "#{edge.target}"}}
        ]
      end)
      |> Enum.uniq_by(& &1.data.id)

    new(nodes ++ edge_list, options(options))
  end

  def new(graph, options) do
    Kino.JS.new(__MODULE__, Map.put(options, :elements, graph))
  end

  defp options(:dag) do
    options = options([])

    dag_layout = %{
      name: "breadthfirst",
      fit: true,
      directed: true,
      avoidOverlap: true,
      spacingFactor: 1.0,
      grid: false,
      padding: 0,
      anime: true,
      circle: false,
      userZoomingEnabled: true
    }

    Map.put(options, :layout_options, dag_layout)
  end

  defp options(options) do
    cytoscape =
      options[:cytoscape] ||
        %{
          "zoomingEnabled" => true,
          "userZoomingEnabled" => true,
          "panningEnabled" => true,
          "userPanningEnabled" => true,
          "boxSelectionEnabled" => false
        }

    node_style =
      options[:node_style] ||
        %{
          "background-color" => "#ffffff",
          "font-size" => 14,
          "width" => 120,
          "shape" => "ellipse",
          "height" => 80,
          "text-wrap" => "wrap",
          "text-max-width" => 56,
          "color" => "#475569",
          "label" => "data(name)",
          "text-halign" => "center",
          "text-valign" => "center",
          "border-width" => 2,
          "border-color" => "#94a3b8",
          "border-opacity" => 0.5
        }

    edge_style =
      options[:edge_style] ||
        %{
          "width" => 2,
          "font-size" => 12,
          "label" => "data(label)",
          "line-color" => "#94a3b8",
          "target-arrow-color" => "#94a3b8",
          "target-arrow-shape" => "triangle",
          "curve-style" => "bezier"
        }

    layout_options =
      options[:layout_options] ||
        %{
          name: "cose",
          fit: true,
          directed: true,
          avoidOverlap: true,
          spacingFactor: 1.0,
          grid: false,
          padding: 0,
          anime: true,
          circle: false,
          userZoomingEnabled: true
        }

    %{
      cytoscape: cytoscape,
      node_style: node_style,
      edge_style: edge_style,
      layout_options: layout_options
    }
  end

  defp to_edgelist_json(graph) do
    vertices =
      graph
      |> Graph.vertices()
      |> Enum.map(fn v ->
        vertex_label = vertex_label(v)
        %{data: %{id: "#{vertex_label}", name: "#{vertex_label}"}}
      end)

    edges =
      graph
      |> Graph.edges()
      |> Enum.map(fn edge ->
        edge_label = edge_label(edge.label)

        v1 = vertex_label(edge.v1)
        v2 = vertex_label(edge.v2)

        %{
          data: %{
            id: "#{edge_label}-#{v1}#{v2}" |> :erlang.phash2(),
            label: "#{edge_label}",
            source: v1,
            target: v2
          }
        }
      end)

    vertices ++ edges
  end

  defp edge_label(label), do: label
  defp vertex_label({_, _, _} = ast), do: Macro.to_string(ast)

  defp vertex_label(%Runic.Workflow.Root{}), do: :root
  defp vertex_label(%{name: name}) when not is_nil(name), do: name

  defp vertex_label(%{__struct__: struct} = vertex),
    do: "#{to_string(struct)}#{:erlang.phash2(vertex)}"

  defp vertex_label(otherwise), do: otherwise

  asset "main.js" do
    """
    import "https://cdn.jsdelivr.net/npm/[email protected]/dist/cytoscape.min.js";

    export function init(ctx, cyto_data) {

      ctx.root.innerHTML = `<div id='cyto' style='width: 896px; height: 400px;'></div>`;

      var cy = cytoscape({
          ...cyto_data.cytoscape,
          ...{
            container: document.getElementById('cyto'),
            elements: cyto_data.elements,
            style: [
              {
                selector: 'node',
                style: cyto_data.node_style
              },
              {
                selector: 'edge',
                style: cyto_data.edge_style
              }
            ]
          }
        });

        cy.layout(cyto_data.layout_options).run();
    }
    """
  end
end
require Runic
import Runic
require Logger
alias Runic.Workflow

Runic French/Spanish translation workflow

french_translator =
  step(
    fn input ->
      LLMTools.generate_response!(
        model: model,
        api_key: llm_api_key,
        max_tokens: 512,
        url: llm_api_url,
        messages: [
          %{role: "user", content: "Translate the following text to French: #{input}"}
        ]
      )
    end,
    name: :french_translator
  )

spanish_translator =
  step(
    fn input ->
      LLMTools.generate_response!(
        model: model,
        api_key: llm_api_key,
        max_tokens: 512,
        url: llm_api_url,
        messages: [
          %{role: "user", content: "Translate the following text to Spanish: #{input}"}
        ]
      )
    end,
    name: :spanish_translator
  )

extract_translation =
  step(&LLMTools.extract_llm_response/1, name: :extract)
wrk =
  workflow(
    name: "language translator",
    steps: [
      french_translator,
      spanish_translator
    ]
  )
  |> Workflow.add(extract_translation, to: :spanish_translator)
  |> Workflow.add(extract_translation, to: :french_translator)
  # attach hooks for debugging intermediate results
  |> Workflow.attach_after_hook(:spanish_translator, fn _step, wrk, fact ->
    IO.inspect("## Component: spanish_translator has produced output: #{inspect(fact.value)}")
    wrk
  end)
  |> Workflow.attach_after_hook(:french_translator, fn _step, wrk, fact ->
    IO.inspect("## Component: french_translator has produced output: #{inspect(fact.value)}")
    wrk
  end)

Use Graph Visualization to show workflow

wrk.graph
|> Kino.Cytoscape.new(
  layout_options: %{
    name: "breadthfirst",
    fit: true,
    directed: true,
    avoidOverlap: true,
    spacingFactor: 1.0,
    grid: false,
    padding: 0,
    anime: true,
    circle: false,
    userZoomingEnabled: true
  }
)
# wrk = 
#   wrk
#   |> Workflow.react_until_satisfied("The sentient potato plots the demise of humanity")
Workflow.raw_reactions(wrk)

Debug runtime memory state

wrk.graph
|> Kino.Cytoscape.new(
  layout_options: %{
    name: "cose",
    fit: false,
    directed: true,
    avoidOverlap: true,
    spacingFactor: 1.0,
    grid: true,
    padding: 0,
    anime: true,
    circle: true,
    userZoomingEnabled: true
  }
)

Runic Structured Outputs

Instructor prompts LLMs to follow a JSON schema (modeled via ecto). It will retry a defined amount of times (3 by default) until the raw string returned from the LLM meets validation to become the defined contract.

Instructor's capabilities to extract structured outputs are immensely useful and support very common use cases with LLMs.

We can piece together similar capabilities in a Runic workflow but componentized in a few parts:

  • Step(s) for building a prompt to request data that meets the schema definition
    • Consider predefined prompts with templates and required inputs
  • Step(s) for hitting the LLM with an API request (e.g wrap Req like above)
  • Intermediate steps to extracting the response payload content
  • Intermediate rules or looping for handling API request errors (e.g. token invalid, timeout, rate limit)
  • Rules + state loop for checking if the payload returned meets criteria
  • Returning the valid data

Prompt anatomy:

Prompt Anatomy

Data Extraction Prompts

  • Goal: Describe domain / problem and use cases surrounding what the data represents

    • List names of fields and describe what the fields are for
  • Define the JSON schema between <schema></schema> tags

  • Warn that we must only return JSON with valid fields based on the described types

  • Inject User / Content to extract data from

Structured Extraction Use Case: Build a knowledge graph

Our goal is to take documents which may contain useful knowledge about known entities (Named Entity Recognition) and collect connections between them (facts).

Triplet Fact Model:

(entity) -- relationship -- (other_entity)

  • source A known entity
  • target Another known entity
  • relationship Connection or edge connecting the source and target
    • Relationship might have "properties"
defmodule Fact do
  use Ecto.Schema
  import Ecto.Changeset

  @primary_key false
  embedded_schema do
    field(:source, :string)
    field(:target, :string)
    field(:relationship, :string)
  end

  def schema,
    do: """
    {
        "type": "object",
        "properties": {
          "source": {"type": "string"},
          "target": {"type": "string"},
          "relationship": {"type": "string"}
        }
    }
    """

  def new(params) do
    %__MODULE__{}
    |> cast(params, [:source, :target, :relationship])
    |> validate_required([:source, :target, :relationship])
    |> case do
      %{valid?: false} = changeset ->
        {:error, changeset}

      %{valid?: true} = changeset ->
        {:ok, apply_changes(changeset)}
    end
  end
end
valid_params = %{
  "source" => "starwars",
  "target" => "media_franchise",
  "relationship" => "is_a",
}

invalid_params = %{
  "source" => "starwars",
  "relationship" => 42,
}
Fact.new(invalid_params)
Fact.new(valid_params)
defmodule KnowledgeBase do
  def research_assistant_prompt(query) do
    [
      %{
        role: :system,
        content: """
        You are a research assistant with deep domain knowledge in every subject.

        Your job is to create thorough reports detailing the core concepts, ideas, and models
        of a subject. Fulfill the user's request and expound into ideas around the user query if it's important
        to understanding the full picture.

        These reports are being ingested into a knowledge base and your responses should show an understanding 
        of the reponsibility for factuality and nuance including potentially many perspectives and contexts.

        Thus your usual process for making these reports is to first ask:

          - "What are the core ideas, models, and concepts to understand this inquiry"
          - Then break down each idea to understand it effectively then describe how to apply that knowledge for the goal of the user's inquiry.

        Format your report in practical markdown and be extensive and thorough.
        """
      },
      %{role: :user, content: query}
    ]
  end

  def generate_edges_prompt(document) do
    [
      %{
        role: :system,
        content: """
          You are a knowledge graph extraction assistant that returns a list of JSON.

          You are doing your best to identify connections and relationships in the text that
          you're sure are a notable person, place, idea, or thing worth modeling into a helpful knowledge graph.

          All entities you find must be in a normalized lowercase snake_case format or else mayhem will ensue.

          You always return a valid JSON array where each array item is an object with the fields source, target, and relationship.

          You return nothing but valid JSON.

          This is the JSON schema you must adhere to:

          <schema>
            {
                "type": "array",
                "items": #{Fact.schema()}
            }
          </schema>
        """
      },
      %{role: :user, content: document}
    ]
  end

  def ask_for_followup_questions_prompt(facts) when is_list(facts) do
    [
      %{
        role: :system,
        content: """
        You are a research assistant tasked with asking salient followup questions to further explore the facts given.

        You are given a list of facts in the form of a knowledge graph triplet containing two subjects and a relationship.

        You must return the top 10 salient questions that can further explore any subject or relationship with the purpose
        of expanding the knowledge in the graph. Never return any more than 10 questions.

        You return nothing but a JSON array containing a list of followup questions.

        The JSON schema you must adhere to:

        <schema>
        {
            "type": "array",
            "items": {
              "type": "string",
            }
        }
        </schema>
        """
      },
      %{
        role: :user,
        content: """
        ### Facts
        #{Enum.map(facts, fn fact ->
          "* #{fact.source} #{fact.relationship} #{fact.target} \\n"
        end)}
        """
      }
    ]
  end

  def extract_kg_edges(maybe_json) do
    extract_json_array(maybe_json, Fact)
  end

  def extract_json_object(maybe_json_string, schema_module) when is_binary(maybe_json_string) do
    case JSON.decode(maybe_json_string) do
      {:ok, json} ->
        extract_json_object(json, schema_module)

      error ->
        error
    end
  end

  def extract_json_object(params, schema_module) do
    case schema_module.new(params) do
      {:ok, cmd} ->
        cmd

      {:error, msg} = result ->
        Logger.error(msg)
        result
    end
  end

  def extract_array_of_strings(text) do
    case Regex.scan(~r/\[.*\]/s, text) do
      [[match]] ->
        case JSON.decode(match) do
          {:ok, array} ->
            array

          error ->
            error
        end

      [] ->
        {:error, :not_found}

      _ ->
        {:error, :unexpected_match_format}
    end
  end

  def extract_json_array("[" <> _rest = maybe_json_string, schema_module)
      when is_binary(maybe_json_string) do
    case JSON.decode(maybe_json_string) do
      {:ok, json} ->
        extract_json_array(json, schema_module)

      error ->
        error
    end
  end

  def extract_json_array(text, schema_module) when is_binary(text) do
    case Regex.scan(~r/\[.*\]/s, text) do
      [[match]] -> extract_json_array(match, schema_module)
      [] -> {:error, :not_found}
      _ -> {:error, :unexpected_match_format}
    end
  end

  def extract_json_array(json_array, schema_module) when is_list(json_array) do
    Enum.reduce(json_array, [], fn params, acc ->
      case schema_module.new(params) do
        {:ok, cmd} ->
          [cmd | acc]

        {:error, msg} ->
          Logger.error(msg)
          acc
      end
    end)
  end

  def extract_array(_, _), do: {:error, :invalid_input}

  def extract_json_array(_, _), do: {:error, :invalid_json}
end

Knowledge Graph Workflow

  • Generate a report on the given subject
  • Generate an edge list of trie facts as JSON structured output
  • Build %Fact{} structs from decoded JSON
generate_research_report =
  rule(
    if: fn 
      {:query, user_query} when is_binary(user_query) -> 
        true
      _ -> false
    end,
    do: fn {:query, user_query} ->
      prompt = KnowledgeBase.research_assistant_prompt(user_query)

      {:report,
       LLMTools.generate_response(
         model: model,
         api_key: llm_api_key,
         max_completion_tokens: 2048,
         url: llm_api_url,
         messages: prompt
       )}
    end,
    name: :generate_research_report
  )

extract_report =
  rule(
    fn {:report, {:ok, %{status: 200} = response}} ->
      LLMTools.extract_llm_response(response)
    end,
    name: :extract_report
  )

simple_kg =
  Runic.workflow(
    name: "kg_workflow",
    rules: [
      generate_research_report,
      extract_report
    ]
  )

generate_kg_facts =
  step(
    fn report ->
      kg_prompt = KnowledgeBase.generate_edges_prompt(report)

      LLMTools.generate_response(
        model: model,
        api_key: llm_api_key,
        max_completion_tokens: 4096,
        url: llm_api_url,
        messages: kg_prompt
      )
      |> case do
        {:ok, %{status: 200} = response} ->
          response
          |> LLMTools.extract_llm_response()
          |> KnowledgeBase.extract_json_array(Fact)

        {:ok, %{status: 400} = response} ->
          dbg(response.body, label: "extract_facts generation failed")
      end
    end,
    name: :generate_kg_facts
  )

simple_kg =
  simple_kg
  |> Workflow.add(generate_kg_facts, to: {:extract_report, :reaction})
simple_kg.graph
|> Kino.Cytoscape.new(
  layout_options: %{
    name: "breadthfirst",
    fit: true,
    directed: true,
    avoidOverlap: true,
    spacingFactor: 1.0,
    grid: false,
    padding: 0,
    anime: true,
    circle: false,
    userZoomingEnabled: true
  }
)
ran_wrk = 
  simple_kg
  |> Workflow.plan_eagerly({:query, "Elixir"})
  |> Workflow.react_until_satisfied()
ran_wrk = ran_wrk |> Workflow.plan_eagerly() |> Workflow.react_until_satisfied()
edges = 
  ran_wrk 
  |> Workflow.raw_productions()
  |> IO.inspect()
  |> Enum.find(&is_list/1)
  |> Enum.map(fn %Fact{} = fact ->
    %{data: %{
      id: "#{fact.relationship}-#{fact.source}#{fact.target}" |> :erlang.phash2(),
      label: "#{fact.relationship}",
      source: fact.source,
      target: fact.target
    }}
  end)
  |> dbg()
edges
|> Kino.Cytoscape.new(
  layout_options: %{
    name: "cose",
    fit: false,
    directed: true,
    avoidOverlap: true,
    spacingFactor: 1.0,
    grid: true,
    padding: 0,
    anime: true,
    circle: true,
    userZoomingEnabled: true
  }
)

Extending the workflow with followup questions

  • Once the initial research report is generated
  • Generate followup questions to explore more in the graph
  • Generate reports to answer each followup question
  • Generate knowledge graph edges for every subject
  • Aggregate all edges
followup_questions =
  step(
    fn facts ->
      prompt =
        KnowledgeBase.ask_for_followup_questions_prompt(facts)

      LLMTools.generate_response(
        model: model,
        api_key: llm_api_key,
        max_completion_tokens: 512,
        url: llm_api_url,
        messages: prompt
      )
      |> LLMTools.extract_llm_response()
      |> KnowledgeBase.extract_array_of_strings()
    end,
    name: :ask_followup_questions
  )

generate_followup_reports =
  map(
    fn question ->
      prompt = KnowledgeBase.research_assistant_prompt(question)

      {:follow_up_report,
       LLMTools.generate_response(
         model: model,
         api_key: llm_api_key,
         max_completion_tokens: 1024,
         url: llm_api_url,
         messages: prompt
       )}
    end,
    name: :generate_followup_reports
  )

handle_follow_up_reports =
  rule(
    fn {:follow_up_report, {:ok, response}} ->
      kg_prompt =
        response
        |> LLMTools.extract_llm_response()
        |> KnowledgeBase.generate_edges_prompt()

      LLMTools.generate_response(
        model: model,
        api_key: llm_api_key,
        max_completion_tokens: 2048,
        url: llm_api_url,
        messages: kg_prompt
      )
      |> case do
        {:ok, %{status: 200} = response} ->
          response
          |> LLMTools.extract_llm_response()
          |> KnowledgeBase.extract_json_array(Fact)

        {:ok, %{status: 400} = response} ->
          dbg(response.body, label: "extract_facts generation failed")
      end
    end,
    name: :handle_follow_up_reports
  )

extended_kg =
  simple_kg
  |> Workflow.add(followup_questions, to: :generate_kg_facts)
  |> Workflow.add(generate_followup_reports, to: :ask_followup_questions)
  |> Workflow.add(handle_follow_up_reports)
extended_kg.graph
|> Kino.Cytoscape.new(:dag)
# ran_wrk =
#   extended_kg
#   |> Workflow.plan_eagerly("elixir")
#   |> Workflow.react_until_satisfied()
#   |> Workflow.plan_eagerly()
#   |> Workflow.react_until_satisfied()
# ran_wrk
# |> Workflow.plan_eagerly()
# |> Workflow.next_runnables()
# edge_list =
#   ran_wrk
#   |> Workflow.raw_productions()
#   |> Enum.filter(fn
#     [%Fact{} | _] ->
#       true

#     _else ->
#       false
#   end)
#   |> Enum.flat_map(fn facts ->
#     Enum.map(facts, fn fact ->
#       %{
#         data: %{
#           id: "#{fact.relationship}-#{fact.source}#{fact.target}" |> :erlang.phash2(),
#           label: "#{fact.relationship}",
#           source: fact.source,
#           target: fact.target
#         }
#       }
#     end)
#   end)
#   |> dbg()

Sprinkle in concurrency

defmodule WorkflowSupervisor do
  use DynamicSupervisor

  def start_link(_) do
    DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end

  def find(id) do
    id
    |> WorkflowRunner.via()
    |> GenServer.whereis()
  end
end

defmodule WorkflowRunner do
  use GenServer
  alias Runic.Workflow
  require Logger

  # id -> pid 
  def via(workflow_id) when is_binary(workflow_id) do
    {:via, Registry, {WorkflowRegistry, {__MODULE__, workflow_id}}}
  end

  def plan(workflow_id, input) do
    GenServer.call(via(workflow_id), {:plan, input})
  end

  def run(workflow_id, input) do
    GenServer.cast(via(workflow_id), {:run, input})
  end

  def get(workflow_id) do
    GenServer.call(via(workflow_id), :get)
  end

  def state(workflow_id) do
    GenServer.call(via(workflow_id), :state)
  end

  def halt(workflow_id) do
    GenServer.call(via(workflow_id), :halt)
  end

  def child_spec(%{id: workflow_id} = session) do
    %{
      id: {__MODULE__, workflow_id},
      start: {__MODULE__, :start_link, [session]},
      restart: :transient
    }
  end

  def start_link(%{id: workflow_id} = session) do
    GenServer.start_link(
      __MODULE__,
      session,
      name: via(workflow_id)
    )
  end

  def start(session) do
    DynamicSupervisor.start_child(
      WorkflowSupervisor,
      {__MODULE__, session}
    )
  end

  def init(%{workflow: %Workflow{}} = session) do
    session =
      session
      |> Map.put(:active_tasks, MapSet.new())

    {:ok, session}
  end

  def handle_call(:get, _from, session) do
    {:reply, session.workflow, session}
  end

  def handle_call(:state, _from, session) do
    {:reply, session, session}
  end

  def handle_cast({:run, input}, session) do
    workflow = Workflow.plan_eagerly(session.workflow, input)

    session = %{session | workflow: workflow}

    enqueue_tasks(session)

    {:noreply, session}
  end

  def handle_info({ref, {_runnable_key, workflow}}, session) do
    Logger.debug("### Task completed")
    
    Process.demonitor(ref, [:flush])

    wrk = 
      session.workflow
      |> Workflow.merge(workflow)
      |> Workflow.plan_eagerly()

    session = %{
      session
      | workflow: wrk
        # active_tasks: MapSet.delete(session.active_tasks, runnable_key)
    }

    if Workflow.is_runnable?(wrk) do
      Logger.info("### Workflow is runnable")
      enqueue_tasks(session)
    else
      
      Logger.info("### Workflow is resolved")
    end

    {:noreply, session}
  end

  def handle_info(msg, session) do
    Logger.debug("## Workflow Runner unhandled_message #{inspect(msg)}")
    {:noreply, session}
  end

  defp enqueue_tasks(session) do
    session.workflow
    |> Workflow.next_runnables()
    |> Enum.reject(fn {step, fact} ->
      MapSet.member?(session.active_tasks, {step.hash, fact.hash})
    end)
    |> Enum.reduce(session, fn {step, fact}, session ->
      key = {step.hash, fact.hash}

      Task.Supervisor.async(TaskRunner, fn ->
        {key, Runic.Workflow.invoke(session.workflow, step, fact)}
      end)

      %{session | active_tasks: MapSet.put(session.active_tasks, key)}
    end)
  end
end

defmodule ManageWorkflows do
  def start_runner(%Runic.Workflow{} = workflow) do
    id = Ecto.UUID.generate()
    session = %{id: id, workflow: workflow}

    with {:ok, _} <- WorkflowRunner.start(session) do
      {:ok, id}
    end
  end

  def run_workflow(id, input) do
    WorkflowRunner.run(id, input)
  end
end
Kino.start_child!({Registry, name: WorkflowRegistry, keys: :unique})
Kino.start_child!({WorkflowSupervisor, []})
Kino.start_child!({Task.Supervisor, name: TaskRunner})
{:ok, workflow_id} = ManageWorkflows.start_runner(extended_kg)
ManageWorkflows.run_workflow(workflow_id, {:query, "elixir"})
# WorkflowRunner.get(workflow_id)
# |> Map.get(:graph)
# |> Kino.Cytoscape.new()
workflow = WorkflowRunner.get(workflow_id) 

edge_list =
  workflow
  |> Workflow.raw_productions()
  |> dbg()
  |> Enum.filter(fn
    [%Fact{} | _] ->
      true

    _else ->
      false
  end)
  |> Enum.uniq()
  |> Enum.flat_map(fn facts ->
    Enum.map(facts, fn fact ->
      %{
        data: %{
          id: "#{fact.relationship}-#{fact.source}#{fact.target}" |> :erlang.phash2(),
          label: "#{fact.relationship}",
          source: fact.source,
          target: fact.target
        }
      }
    end)
  end)
edge_list
|> Kino.Cytoscape.new()

That's it

charlie-graphs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment