Last active
December 25, 2018 12:01
-
-
Save seratch/823dfc0f718a9b0deaa88b0020a455a6 to your computer and use it in GitHub Desktop.
r2dbc sample in Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lazy val root = (project in file(".")) | |
.settings( | |
scalaVersion := "2.12.8", | |
scalacOptions ++= Seq("-deprecation", "-unchecked", "-feature", "-Xfuture"), | |
libraryDependencies ++= Seq( | |
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0" % Compile, | |
"io.monix" %% "monix-reactive" % "2.3.3" % Compile, | |
"io.r2dbc" % "r2dbc-spi" % "1.0.0.M6" % Compile, | |
"io.r2dbc" % "r2dbc-client" % "1.0.0.M6" % Compile, | |
"io.r2dbc" % "r2dbc-h2" % "1.0.0.M6" % Test, | |
"io.r2dbc" % "r2dbc-postgresql" % "1.0.0.M6" % Test, | |
"org.scalatest" %% "scalatest" % "3.0.5" % Test | |
), | |
resolvers += "spring-milestone" at "https://repo.spring.io/milestone", | |
scalafmtOnCompile := true | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest._ | |
class H2Spec extends FlatSpec with Matchers { | |
case class Sample(id: Long, name: Option[String]) | |
"H2 connections" should "work" in { | |
import io.r2dbc.h2.{ H2ConnectionConfiguration, H2ConnectionFactory } | |
import io.r2dbc.client.R2dbc | |
val config = H2ConnectionConfiguration.builder().url("mem:sample").build() | |
val r2dbc = new R2dbc(new H2ConnectionFactory(config)) | |
import reactor.core.publisher.Flux | |
val tableCreation: Flux[Integer] = r2dbc.inTransaction { handle => | |
handle.execute("create table sample (id bigint primary key, name varchar(100))") | |
} | |
tableCreation.blockFirst() | |
val result: Flux[Sample] = { | |
val insertions: Flux[Integer] = r2dbc.inTransaction { handle => | |
// Identifier '$id' is not a valid identifier. Should be of the pattern '.*\$([\d]+).*' | |
val updates = handle.createUpdate("insert into sample (id, name) values ($1, $2)") | |
updates.bind("$1", 1).bind("$2", "Alice").add() | |
updates.bind("$1", 2).bind("$2", "Bob").add() | |
updates.bind("$1", 3).bindNull("$2", classOf[String]).add() | |
updates.execute() | |
/* | |
val batch = handle.createBatch() | |
batch.add("insert into sample (id, name) values (1, 'Alice')") | |
batch.add("insert into sample (id, name) values (2, 'Bob')") | |
batch.add("insert into sample (id, name) values (3, null)") | |
batch.mapResult(_.getRowsUpdated) | |
*/ | |
} | |
val fetchingAll: Flux[Sample] = r2dbc.inTransaction { handle => | |
handle.select("select id, name from sample order by id desc").mapRow { row => | |
Sample( | |
id = Long.unbox(row.get("id", classOf[java.lang.Long])), | |
name = Option(row.get("name", classOf[String])) | |
) | |
} | |
} | |
insertions.thenMany(fetchingAll) | |
} | |
// simple example to run with monix-reactive | |
import monix.reactive.Observable | |
val observable: Observable[Sample] = | |
Observable.fromReactivePublisher(result) | |
import monix.execution.Scheduler.Implicits.global | |
observable.toListL.runSyncMaybe match { | |
case Right(samples) => | |
samples.size should equal(3) | |
samples.map(_.id) should equal(Seq(3, 2, 1)) | |
case Left(cancelableFuture) => | |
fail(s"Failed to complete Task#runSyncMaybe with ${cancelableFuture}") | |
} | |
// simple example to convert the Publisher to a Future object | |
/* | |
import java.util.concurrent.CompletableFuture | |
val jFuture: CompletableFuture[java.util.List[Sample]] = result.collectList().toFuture | |
import scala.collection.JavaConverters._ | |
import scala.compat.java8.FutureConverters._ | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
val f: Future[Seq[Sample]] = jFuture.toScala.map(_.asScala) | |
val allRows: Seq[Sample] = Await.result(f, Duration.Inf) | |
allRows.size should equal(3) | |
allRows.map(_.id) should equal(Seq(3, 2, 1)) | |
*/ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment