Last active
November 1, 2021 18:16
-
-
Save nwalker/f98bdcc3e02b9e59fd4128b5e1c9f6be to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule DbcEpgsql do | |
# really basic dbconnection impl for postgres using epgsql | |
# * no status support | |
# * no savepoints and nested transactions | |
# * no error handling | |
# * hardcoded connection params | |
# | |
# BUT: tests with tracing enabled for quick understanding what's going on | |
# just change params in connect/1, run DbcEpgsql.Test.query in iex and start exploring what's going on | |
# $ iex -S mix | |
# iex(1)> DbcEpgsql.Test.query | |
# .... tons of info! | |
defmodule State do | |
defstruct conn: nil | |
end | |
defmodule Query do | |
defstruct s: nil, ps: nil, name: nil | |
defimpl DBConnection.Query do | |
def parse(q, _opts), do: q | |
def describe(q, _), do: q | |
def encode(_q, args, _), do: args | |
def decode(_q, res, _), do: res | |
end | |
end | |
use DBConnection | |
def connect(opts) do | |
opts = Map.new(opts) |> Map.merge(%{ | |
host: 'localhost', port: 5432, | |
username: 'test', password: 'test', | |
database: 'static_test', | |
}) | |
case :epgsql.connect(opts) do | |
{:ok, e_conn} -> {:ok, %State{conn: e_conn}} | |
{:error, reason} -> {:error, DBConnection.ConnectionError.exception("epgsql", reason)} | |
end | |
end | |
def disconnect(_err, state) do | |
:epgsql.close(state.conn) | |
end | |
def checkout(state) do | |
{:ok, state} | |
end | |
def ping(state) do | |
case :epgsql.sync(state.conn) do | |
:ok -> {:ok, state} | |
{:error, e} -> throw(e) | |
end | |
end | |
def handle_begin(_opts, state) do | |
# savepoints should be here | |
case :epgsql.squery(state.conn, "BEGIN") do | |
{:ok, _, _} -> {:ok, nil, state} | |
other -> throw(other) | |
end | |
end | |
def handle_commit(_opts, state) do | |
case :epgsql.squery(state.conn, "COMMIT") do | |
{:ok, _, _} -> {:ok, nil, state} | |
other -> throw(other) | |
end | |
end | |
def handle_rollback(_opts, state) do | |
case :epgsql.squery(state.conn, "ROLLBACK") do | |
{:ok, _, _} -> {:ok, nil, state} | |
other -> throw(other) | |
end | |
end | |
def handle_prepare(query, _opts, state) do | |
name = query.name || [] | |
case :epgsql.parse(state.conn, name, query.s, []) do | |
{:ok, ps} -> {:ok, %{query | ps: ps, name: name}, state} | |
other -> throw(other) | |
end | |
end | |
def handle_declare(query, params, opts, state) do | |
rows = opts[:max_rows] || 0 | |
cursor_name = make_ref() |> :erlang.term_to_binary() |> Base.encode64 |> to_charlist() | |
case :epgsql.bind(state.conn, query.ps, cursor_name, params) do | |
:ok -> {:ok, query, {:cursor, cursor_name, rows}, state} | |
other -> throw(other) | |
end | |
end | |
def handle_execute(query, params, _opts, state) do | |
case :epgsql.prepared_query(state.conn, query.ps, params) do | |
success when elem(success, 0) == :ok -> {:ok, query, success, state} | |
other -> throw(other) | |
end | |
end | |
def handle_fetch(query, {:cursor, name, size}, _opts, state) do | |
case :epgsql.execute(state.conn, query.ps, name, size) do | |
{:partial, rows} -> {:cont, rows, state} | |
success when elem(success, 0) == :ok -> {:halt, success, state} | |
other -> throw(other) | |
end | |
end | |
def handle_deallocate(_query, {:cursor, name, _}, _opts, state) do | |
case :epgsql.close(state.conn, :portal, name) do | |
:ok -> {:ok, nil, state} | |
{:error, err} -> throw(err) | |
end | |
end | |
def handle_close(query, _opts, state) do | |
case :epgsql.close(state.conn, query.ps) do | |
:ok -> {:ok, nil, state} | |
{:error, err} -> throw(err) | |
end | |
end | |
defmodule Test do | |
alias DbcEpgsql.Query, as: Q | |
alias :dbg, as: Dbg | |
def query() do | |
setup_trace() | |
{:ok, pool} = DBConnection.start_link(DbcEpgsql, []) | |
try do | |
DBConnection.transaction(pool, fn conn -> | |
q1 = %Q{name: "q1", s: "SELECT NULL as test, $1::integer as test2"} | |
q2 = %Q{name: "q2", s: "SELECT tab.* FROM static.revision AS tab LIMIT 0"} | |
{:ok, ps1, _} = DBConnection.prepare_execute(conn, q1, [1]) |> IO.inspect(label: "prepare_execute") | |
{:ok, _ps2, _} = DBConnection.prepare_execute(conn, q2, []) |> IO.inspect(label: "prepare_execute") | |
{:ok, _, _} = DBConnection.execute(conn, ps1, [2]) |> IO.inspect(label: "execute") | |
# {:ok, _} = DBConnection.close(conn, ps) | |
# {:ok, _} = DBConnection.close(conn, ps) | |
end) | |
rescue | |
e -> IO.inspect(e) | |
catch | |
c, e -> IO.inspect({c, e}) | |
after | |
stop_trace() | |
Process.exit(pool, :kill) | |
end | |
end | |
def stream() do | |
setup_trace() | |
{:ok, pool} = DBConnection.start_link(DbcEpgsql, []) | |
try do | |
DBConnection.transaction(pool, fn conn -> | |
q1 = %Q{s: "SELECT unnest($1::int[])", name: "some"} | |
# q2 = %Q{name: "q2", s: "SELECT tab.* FROM static.product_out AS tab LIMIT 0"} | |
{:ok, ps1} = DBConnection.prepare(conn, q1) |> IO.inspect(label: "prepare") | |
# DBConnection.execute(conn, ps1, [[1, 2, 3, 4, 5]]) |> IO.inspect(label: "execute") | |
DBConnection.stream(conn, ps1, [[1, 2, 3, 4, 5]], max_rows: 2) |> Enum.to_list() |> IO.inspect(label: "stream") | |
{:ok, _} = DBConnection.close(conn, ps1) | |
# {:ok, _} = DBConnection.close(conn, ps) | |
end) | |
rescue | |
e -> IO.inspect(e) | |
catch | |
c, e -> IO.inspect({c, e}) | |
after | |
stop_trace() | |
Process.exit(pool, :kill) | |
end | |
end | |
def setup_trace() do | |
Dbg.start() | |
Dbg.tracer() | |
Dbg.tpl(DbcEpgsql, :_, :_, [{:_, [], [{:return_trace}]}]) | |
Dbg.tpl(DBConnection.Query.DbcEpgsql.Query, :_, :_, [{:_, [], [{:return_trace}]}]) | |
Dbg.tp(:epgsql, :_, :_, [{:_, [], [{:return_trace}]}]) | |
Dbg.p(:all, :c) | |
end | |
def stop_trace(), do: Dbg.stop_clear() | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule DbcEpgsql.MixProject do | |
use Mix.Project | |
def project do | |
[ | |
app: :dbc_epgsql, | |
version: "0.1.0", | |
elixir: "~> 1.12", | |
start_permanent: Mix.env() == :prod, | |
deps: deps() | |
] | |
end | |
# Run "mix help compile.app" to learn about applications. | |
def application do | |
[ | |
extra_applications: [:logger, :runtime_tools] | |
] | |
end | |
# Run "mix help deps" to learn about dependencies. | |
defp deps do | |
[ | |
{:db_connection, "~> 2.4.1"}, | |
{:epgsql, git: "https://github.com/epgsql/epgsql.git", tag: "4.6.0"}, | |
# {:dep_from_hexpm, "~> 0.3.0"}, | |
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} | |
] | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%{ | |
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, | |
"db_connection": {:hex, :db_connection, "2.4.1", "6411f6e23f1a8b68a82fa3a36366d4881f21f47fc79a9efb8c615e62050219da", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea36d226ec5999781a9a8ad64e5d8c4454ecedc7a4d643e4832bf08efca01f00"}, | |
"epgsql": {:git, "https://github.com/epgsql/epgsql.git", "f7530f63ae40ea2b81bae7d4a33292212349b761", [tag: "4.6.0"]}, | |
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment