Skip to content

Instantly share code, notes, and snippets.

@johnhaley81
Last active December 19, 2019 20:08
Show Gist options
  • Save johnhaley81/3472d9f5feba79aab352926fe8897197 to your computer and use it in GitHub Desktop.
Save johnhaley81/3472d9f5feba79aab352926fe8897197 to your computer and use it in GitHub Desktop.
Some reasonml bindings for graphile worker
open Relude.Globals;
[@bs.module "dotenv"] external getEnvVars: unit => unit = "config";
let getEnvVars = () =>
getEnvVars()
|> (() => Node.Process.process##env)
|> (Js.Dict.map((. x) => x |> Json.Encode.string) >> Json.Encode.dict);
// This file needs to live in ./tasks and all of the reason source needs to live in `./src`
module.exports = require("../src/ExampleWorkable.bs").runner;
open Relude.Globals;
module Workable = {
type envVars = unit;
let decodeEnvVars = const(Result.ok());
type payload = unit;
let taskName = __MODULE__;
let decode = decodeEnvVars;
let encode = const(Json.Encode.(object_([])));
let runner =
PgClient.TransactionedConnection.queryUnit(
~query="SELECT do_the_thing()",
);
let runner = ((), ()) =>
IO.pure
>> IO.flatMap(
GraphileWorker.Task.Helpers.withPgClientGet(
PgClient.TransactionedConnection.runInTransaction(
runner
)
>> IO.mapError(
PgClient.mapPgErrorToException(
"An error occured while running doing the thing",
),
),
),
);
};
include GraphileWorker.MakeWorkable(Workable);
open Relude.Globals;
module JobInstanceInfo = {
[@bs.deriving abstract]
type t = {
id: int,
[@bs.as "queue_name"]
queueName: string,
[@bs.as "task_identifier"]
taskIdentifier: string,
payload: Js.Json.t,
priority: int,
[@bs.as "run_at"]
runAt: Js.Date.t,
attempts: int,
[@bs.optional] [@bs.as "last_error"]
lastError: string,
[@bs.as "created_at"]
createdAt: Js.Date.t,
[@bs.as "updated_at"]
updatedAt: Js.Date.t,
};
let make = t;
};
module Task = {
module Helpers = {
type withPgClientResult;
external unsafeToWithPgClientResult: 'a => withPgClientResult =
"%identity";
external unsafeFromWithPgClientResult: withPgClientResult => 'a =
"%identity";
[@bs.deriving abstract]
type t = {
[@bs.optional]
debug: (. string) => unit,
job: JobInstanceInfo.t,
withPgClient:
(PgClient.Connection.t => Js.Promise.t(withPgClientResult)) =>
Js.Promise.t(withPgClientResult),
};
let make = t;
let debugGet = debugGet >> Belt.Option.getWithDefault(_, (. _) => ());
let debugGet = (~prefix=?, t, message) =>
t->debugGet(.
prefix->Belt.Option.mapWithDefault(message, prefix =>
prefix ++ ": " ++ message
),
);
let withPgClientGet = Relude.Function.flip(withPgClientGet);
let withPgClientGet = (fn, t) =>
PromiseUtils.lazyPromiseToReludeIO(() =>
t
|> withPgClientGet(
fn
>> IO.map(unsafeToWithPgClientResult)
>> Relude.Js.Promise.fromIOJsExn,
)
)
|> IO.map(unsafeFromWithPgClientResult);
};
let make = (taskFn, payload: Js.Json.t, helpers: Helpers.t) =>
taskFn(payload, helpers) |> Relude.Js.Promise.fromIOJsExn;
};
module type Workable = {
type envVars;
let decodeEnvVars: Js.Json.t => Json.Decode.ResultUtil.r(envVars);
type payload;
let decode: Js.Json.t => Json.Decode.ResultUtil.r(payload);
let encode: payload => Js.Json.t;
let taskName: string;
let runner: (envVars, payload, Task.Helpers.t) => IO.t(unit, Js.Exn.t);
};
module MakeWorkable = (W: Workable) => {
type t = W.payload;
let decode = W.decode;
let encode = W.encode;
let enqueueJob = payload =>
PgClient.Connection.queryUnit(
~values=
[|W.taskName, payload |> W.encode |> Js.Json.stringify|]
|> Array.map(Option.pure),
~query="SELECT graphile_worker.add_job($1, $2);",
)
>> IO.mapError(
PgClient.mapPgErrorToException("Error enqueuing job " ++ W.taskName),
);
let enqueueJobInTransaction = payload =>
PgClient.TransactionedConnection.queryUnit(
~values=
[|W.taskName, payload |> W.encode |> Js.Json.stringify|]
|> Array.map(Option.pure),
~query="SELECT graphile_worker.add_job($1, $2);",
);
let getDecodedEnvVars = decode =>
EnvVars.getEnvVars()
|> decode
|> Json.Decode.mapErrorToDebugString
|> Relude.Result.mapError(Relude.Js.Exn.make)
|> IO.fromResult;
let runner =
Task.make((payloadJson, helpers) =>
payloadJson
|> W.decode
|> Relude.Result.mapError(
Json.ParseError.failureToDebugString >> Relude.Js.Exn.make,
)
|> IO.fromResult
|> IO.flatMap(payload =>
W.decodeEnvVars
|> getDecodedEnvVars
|> IO.flatMap(envVars => W.runner(envVars, payload, helpers))
)
);
};
module JobInstanceInfo: {
type t;
let idGet: t => int;
let queueNameGet: t => string;
let taskIdentifierGet: t => string;
let payloadGet: t => Js.Json.t;
let priorityGet: t => int;
let runAtGet: t => Js.Date.t;
let attemptsGet: t => int;
let lastErrorGet: t => option(string);
let createdAtGet: t => Js.Date.t;
let updatedAtGet: t => Js.Date.t;
let make:
(
~id: int,
~queueName: string,
~taskIdentifier: string,
~payload: Js.Json.t,
~priority: int,
~runAt: Js.Date.t,
~attempts: int,
~lastError: string=?,
~createdAt: Js.Date.t,
~updatedAt: Js.Date.t,
unit
) =>
t;
};
module Task: {
module Helpers: {
type withPgClientResult;
type t;
let make:
(
~debug: (. string) => unit=?,
~job: JobInstanceInfo.t,
~withPgClient: (
PgClient.Connection.t =>
Js.Promise.t(withPgClientResult)
) =>
Js.Promise.t(withPgClientResult),
unit
) =>
t;
let jobGet: t => JobInstanceInfo.t;
let debugGet: (~prefix: string=?, t, string) => unit;
let withPgClientGet:
(PgClient.Connection.t => Relude.IO.t('a, Js.Exn.t), t) =>
Relude.IO.t('a, Js.Exn.t);
};
let make:
(
(Js.Json.t, Helpers.t) => Relude.IO.t('a, Js.Exn.t),
Js.Json.t,
Helpers.t
) =>
Js.Promise.t('a);
};
module type Workable = {
type envVars;
let decodeEnvVars: Js.Json.t => Json.Decode.ResultUtil.r(envVars);
type payload;
let decode: Js.Json.t => Json.Decode.ResultUtil.r(payload);
let encode: payload => Js.Json.t;
let taskName: string;
let runner:
(envVars, payload, Task.Helpers.t) => Relude.IO.t(unit, Js.Exn.t);
};
module MakeWorkable:
(W: Workable) =>
{
type t = W.payload;
let decode: Js.Json.t => Json.Decode.ResultUtil.r(W.payload);
let encode: W.payload => Js.Json.t;
let enqueueJob:
(W.payload, PgClient.Connection.t) => Relude.IO.t(unit, Js.Exn.t);
let enqueueJobInTransaction:
(W.payload, PgClient.TransactionedConnection.t) =>
Relude.IO.t(unit, PgClient.pgError);
let runner: (Js.Json.t, Task.Helpers.t) => Js.Promise.t(unit);
};
open Relude.Globals;
[@bs.deriving accessors]
type pgError =
| QueryError(Js.Exn.t)
| DecodeError(Json.ParseError.failure)
| NoResults;
let mapPgErrorToException = noResultsMessage =>
fun
| QueryError(queryError) => queryError
| NoResults => noResultsMessage |> RJs.Exn.make
| DecodeError(decodeError) =>
decodeError |> Json.ParseError.failureToDebugString |> RJs.Exn.make;
module Result = {
[@bs.deriving abstract]
type t = {rows: array(Js.Json.t)};
let make = t;
};
module QueryPayload = {
[@bs.deriving abstract]
type t = {
text: string,
[@bs.optional]
values: array(Js.Nullable.t(string)),
};
let valuesGet = valuesGet >> Option.map(Array.map(Js.Nullable.toOption));
let make = (~values=?) =>
t(
~values=?{
values |> Option.map(Array.map(Js.Nullable.fromOption));
},
);
};
module type ConnectionInterface = {
type t;
let queryMany:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t(array('a), pgError);
let queryOne:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t('a, pgError);
let queryUnit:
(~values: array(option(string))=?, ~query: string, t) =>
Relude.IO.t(unit, pgError);
};
module Connection = {
type t;
[@bs.send] external connect: t => Js.Promise.t(unit) = "connect";
let connect = client =>
PromiseUtils.lazyPromiseToReludeIO(() => client |> connect)
|> IO.map(_ => client);
[@bs.send] external disconnect: t => Js.Promise.t('a) = "end";
let disconnect = client =>
PromiseUtils.lazyPromiseToReludeIO(() => client |> disconnect);
[@bs.module "pg"] [@bs.new]
external fromConnectionString: CommonTypes.DBConnectionString.t => t =
"Client";
let runWithConnectionString = fn =>
fromConnectionString
>> connect
>> IO.flatMap(client =>
client
|> fn
|> IO.flatMap(result => client |> disconnect |> IO.map(() => result))
|> IO.catchError(e =>
client |> disconnect |> IO.flatMap(() => IO.throw(e))
)
);
[@bs.send]
external query: (t, QueryPayload.t) => Js.Promise.t(Result.t) = "query";
let query = (~values=?, ~query as text, t) =>
t->query(QueryPayload.make(~text, ~values?, ()));
let query = (~values=?, ~query as sqlQueryText, t) =>
RJs.Promise.toIOLazy(() => query(~values?, ~query=sqlQueryText, t))
|> IO.map(Result.rowsGet)
|> IO.mapError(Relude.Unsafe.coerce >> queryError);
let queryDecoder = decoder =>
IO.flatMap(
decoder >> Relude.Result.fold(decodeError >> IO.throw, IO.pure),
);
let queryMany = (~values=?, ~query as sqlQueryText, ~decoder) =>
query(~values?, ~query=sqlQueryText)
>> IO.map(Json.Encode.jsonArray)
>> queryDecoder(Json.Decode.array(decoder));
let queryOne = (~values=?, ~query as sqlQueryText, ~decoder) =>
query(~values?, ~query=sqlQueryText)
>> IO.flatMap(
Array.head >> Option.foldLazy(() => NoResults |> IO.throw, IO.pure),
)
>> queryDecoder(decoder);
let queryUnit = (~values=?, ~query as sqlQueryText) =>
query(~values?, ~query=sqlQueryText) >> IO.map(_ => ());
};
module TransactionedConnection = {
// Outside of this file these are considered not the same
// so you can't mix transactions with non transactions
type t = Connection.t;
let queryMany = Connection.queryMany;
let queryOne = Connection.queryOne;
let queryUnit = Connection.queryUnit;
let startTransaction = t =>
t |> Connection.query(~query="BEGIN") |> IO.map(_ => t);
let endTransaction = t =>
t
|> Connection.query(~query="COMMIT")
|> IO.map(_ => t)
|> IO.catchError(err =>
t
|> Connection.query(~query="ROLLBACK")
|> IO.flatMap(_ => err |> IO.throw)
);
let runInTransaction = (thingsToDoInTransaction, t) =>
t
|> startTransaction
|> IO.flatMap(thingsToDoInTransaction)
|> IO.catchError(err =>
t |> endTransaction |> IO.flatMap(_ => err |> IO.throw)
)
|> IO.flatMap(result => t |> endTransaction |> IO.map(_ => result));
};
type pgError =
| QueryError(Js.Exn.t)
| DecodeError(Json.ParseError.failure)
| NoResults;
let mapPgErrorToException: (string, pgError) => Js.Exn.t;
module Result: {
type t;
let rowsGet: t => array(Js.Json.t);
let make: (~rows: array(Js.Json.t)) => t;
};
module QueryPayload: {
type t;
let textGet: t => string;
let valuesGet: t => option(array(option(string)));
let make: (~values: array(option(string))=?, ~text: string, unit) => t;
};
module type ConnectionInterface = {
type t;
let queryMany:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t(array('a), pgError);
let queryOne:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t('a, pgError);
let queryUnit:
(~values: array(option(string))=?, ~query: string, t) =>
Relude.IO.t(unit, pgError);
};
module Connection: {
type t;
let runWithConnectionString:
(t => Relude.IO.t('a, Js.Exn.t), CommonTypes.DBConnectionString.t) =>
Relude.IO.t('a, Js.Exn.t);
let queryMany:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t(array('a), pgError);
let queryOne:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t('a, pgError);
let queryUnit:
(~values: array(option(string))=?, ~query: string, t) =>
Relude.IO.t(unit, pgError);
};
module TransactionedConnection: {
type t;
let startTransaction: Connection.t => Relude.IO.t(t, pgError);
let endTransaction: t => Relude.IO.t(Connection.t, pgError);
let runInTransaction:
(t => Relude.IO.t('a, pgError), Connection.t) => Relude.IO.t('a, pgError);
let queryMany:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t(array('a), pgError);
let queryOne:
(
~values: array(option(string))=?,
~query: string,
~decoder: Js.Json.t => Json.Decode.ResultUtil.r('a),
t
) =>
Relude.IO.t('a, pgError);
let queryUnit:
(~values: array(option(string))=?, ~query: string, t) =>
Relude.IO.t(unit, pgError);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment