Skip to content

Instantly share code, notes, and snippets.

@fchabouis
Last active October 14, 2021 13:53
Show Gist options
  • Save fchabouis/58a297d263d52678ddd0da026adea214 to your computer and use it in GitHub Desktop.
Save fchabouis/58a297d263d52678ddd0da026adea214 to your computer and use it in GitHub Desktop.
Elixir Livebook : GTFS to GeoJson conversion using Oban

Oban

Objectif

Lancer via Oban un job de conversion d'un GTFS en GeoJson.

On passe l'url du GTFS au worker. Celui ci le télécharge et le convertit en GeoJson, le sauvegarde dans un fichier et notifie le process principal lorsqu'il a terminé en lui indiquant l'emplacement du résultat.

Pré-requis

  • Avoir Postgres qui tourne en local
  • Si on a autre chose que postgres/postgres en user/pwd de Postgres, le changer au début du script
  • Avoir le convertisseur gtfs -> geojson compilé en local et faire pointer @binary_path vers celui-ci

Setup

Faire tourner ce script pour :

  • installation des dépendances
  • création de la base postgres qui sert à Oban
  • Lancement de Oban (supervision, etc)
Mix.install([
  {:ecto_sql, "~> 3.6.2"},
  {:postgrex, "~> 0.15.0"},
  {:oban, "~> 2.8"},
  {:rambo, "~> 0.3.4"}
])

Application.put_env(:myapp, Repo, url: "ecto://postgres:postgres@localhost/mix_install_oban")

defmodule Repo do
  use Ecto.Repo,
    adapter: Ecto.Adapters.Postgres,
    otp_app: :myapp
end

defmodule Migration0 do
  use Ecto.Migration

  def change do
    Oban.Migrations.up()
  end
end

defmodule Main do
  def main do
    children = [
      Repo,
      {Oban, repo: Repo, plugins: [Oban.Plugins.Pruner], queues: [default: 10]}
    ]

    Repo.__adapter__().storage_down(Repo.config())
    Repo.__adapter__().storage_up(Repo.config())
    {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)

    Ecto.Migrator.run(Repo, [{0, Migration0}], :up, all: true)
  end
end

Main.main()

Conversion !

Le module Converter est le worker Oban qui va faire le travail de conversion. Puis on lance un job et on tend l'oreil pour voir si une notification revient via le canal :gossip.

defmodule Converter do
  use Oban.Worker

  @binary_path "/home/francis/projects/transport/transport-site/transport-tools/gtfs-geojson"

  def download_file(url) do
    :inets.start()
    :ssl.start()

    {:ok, resp} = :httpc.request(:get, {url, []}, [], body_format: :binary)
    {{_, 200, 'OK'}, _headers, body} = resp
    body
  end

  @impl true
  def perform(%{args: %{"url" => url}} = job) do
    body = download_file(url)

    filename = :crypto.hash(:md5, url) |> Base.encode16()
    file_path = "/tmp/#{filename}"
    result_path = "/tmp/#{filename}.geojson"

    File.write!(file_path, body)

    case Rambo.run(@binary_path, ["--input", file_path]) do
      {:ok, %Rambo{out: res}} ->
        # write the result
        File.write!(result_path, res)

        # notify the job is done and where is the result !
        Oban.Notifier.notify(:gossip, %{job_id: job.id, result_path: result_path})
        :ok

      {:error, %Rambo{err: err_msg}} ->
        {:error, err_msg}

      {:error, _} = r ->
        r
    end
  end
end

# On lance le job :

gtfs_url = "https://static.data.gouv.fr/resources/chamonix-bus/20210916-115155/gtfs-1-.zip"

{:ok, job} = Oban.insert(Converter.new(%{url: gtfs_url}))
id = job.id

Oban.Notifier.listen([:gossip])

receive do
  {:notification, :gossip, %{"job_id" => ^id, "result_path" => result_path}} ->
    IO.puts("job #{id} is complete!")
    IO.inspect("geojson has been stored here: #{result_path}")
after
  10_000 ->
    IO.puts("Ooops, something went wrong")
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment