Last active
March 31, 2022 05:12
-
-
Save fchabouis/928d9d64230dd7e952e90ba3e8c8628d to your computer and use it in GitHub Desktop.
Elixir : Stream a paginated API to a Postgres database, all included
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Mix.install([ | |
{:ecto_sql, "~> 3.7.0"}, | |
{:postgrex, "~> 0.15.0"}, | |
{:httpoison, ">= 0.0.0"}, | |
{:jason, ">= 0.0.0"} | |
]) | |
require Logger | |
# SETUP DATABASE, THANKS TO https://github.com/wojtekmach/mix_install_examples | |
# put your credentials here to access your local postgres database: | |
Application.put_env(:foo, Repo, url: "ecto://postgres:postgres@localhost/datasets") | |
defmodule Repo do | |
use Ecto.Repo, | |
adapter: Ecto.Adapters.Postgres, | |
otp_app: :foo | |
end | |
defmodule Migration0 do | |
use Ecto.Migration | |
def change do | |
create table("datasets") do | |
add(:title, :string) | |
add(:external_id, :string) | |
end | |
end | |
end | |
defmodule Dataset do | |
use Ecto.Schema | |
schema "datasets" do | |
field(:title, :string) | |
field(:external_id, :string) | |
end | |
end | |
defmodule DB do | |
def setup do | |
children = [ | |
Repo | |
] | |
_ = Repo.__adapter__().storage_down(Repo.config()) | |
case Repo.__adapter__().storage_up(Repo.config()) do | |
:ok -> {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) | |
{:error, :already_up} -> Logger.info("storage already up") | |
end | |
Ecto.Migrator.run(Repo, [{0, Migration0}], :up, all: true, log_sql: :debug) | |
end | |
end | |
DB.setup() | |
# SET UP API STREAMING, TAKEN FROM https://francis.chabouis.fr/posts/stream-api-with-elixir/ | |
start_fun = fn url -> | |
fn -> {url} end | |
end | |
next_fun = fn | |
nil -> | |
{:halt, nil} | |
{url} -> | |
# we make a GET request on the url | |
case HTTPoison.get(url) do | |
{:ok, %{status_code: 200, body: body}} -> | |
# we decode the body from a string to an Elixir map | |
{:ok, content} = Jason.decode(body) | |
# we keep only what interests us; Here, what's in the data key | |
items = content |> Map.get("data", []) | |
Logger.info("fetch #{Enum.count(items)} items from the API coming from #{url}") | |
# we return the results, and the next url to query | |
{items, {Map.get(content, "next_page")}} | |
_ -> | |
# something went wrong with the API call, we stop the stream | |
{:halt, nil} | |
end | |
end | |
after_fun = fn _ -> nil end | |
stream_api = fn url -> | |
Stream.resource( | |
start_fun.(url), | |
next_fun, | |
after_fun | |
) | |
end | |
datasets = stream_api.("https://www.data.gouv.fr/api/1/datasets/") | |
# WE ARE READY | |
# START STREAMING THE API CONTENT TO THE DB! | |
Repo.transaction(fn -> | |
datasets | |
|> Stream.map(fn dataset -> | |
%{ | |
external_id: dataset["id"], | |
title: dataset["title"] | |
} | |
end) | |
|> Stream.take(200) | |
|> Stream.chunk_every(100) | |
|> Stream.each(fn changesets -> Repo.insert_all(Dataset, changesets) end) | |
|> Stream.run() | |
end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist is part of a blog post explaining in details what it does.
In a nutshell, If you run this Elixir script, it will create a Postgres database called datasets on your computer. And it will start streaming data from an external paginated API to your database.
All the Ecto database setup is taken from https://github.com/wojtekmach/mix_install_examples and I thank @wojtekmach for its nice work !