Created
March 25, 2020 14:00
-
-
Save fanf/f39872886ee5b24c0d3e7af0c8171722 to your computer and use it in GitHub Desktop.
Testing batch process with ZIO. Concurrency is hard.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio._ | |
import zio.clock._ | |
import zio.duration._ | |
import zio.test.environment._ | |
import java.util.concurrent.TimeUnit | |
object Batch { | |
/* | |
* A batch is a forever looping running process (`forever`) forked in background | |
* and properly detached from its parent fiber (`forkDaemon`) that execute an | |
* effect each time a trigger is yielded. | |
*/ | |
def createBatch(effect: ZIO[Clock, Nothing, Unit]): URIO[Clock, Fiber[Nothing, Nothing]] = { | |
// let's say that in our case, the trigger is just a "some time passed" | |
val trigger = UIO.unit.delay(5.minutes) | |
// the whole batch | |
(trigger *> effect).forever.forkDaemon | |
} | |
} | |
object MainTest1 { | |
/* | |
* Create a batch whose effect it to store current time in a `Ref` and write something out | |
*/ | |
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch( | |
// access what is needed from environment | |
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t => | |
// actual effect | |
UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _) | |
) } | |
) | |
val prog1 = ZIO.accessM[Clock with TestClock] { testClock => | |
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d) | |
for { | |
r <- Ref.make(List.empty[Long]) | |
_ <- demoBatch(r) | |
_ <- adjust(12.minutes) | |
l <- r.get | |
} yield l | |
}.provideLayer(testEnvironment) | |
def main(args: Array[String]): Unit = { | |
val l = Runtime.default.unsafeRun(prog1) | |
assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}") | |
} | |
} | |
object MainTest2 { | |
/* | |
* Create a batch whose effect it to store current time in a `Ref` and write something out | |
*/ | |
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch( | |
// access what is needed from environment | |
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t => | |
// actual effect | |
UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _) | |
) } | |
) | |
val prog2 = ZIO.accessM[Clock with TestClock] { testClock => | |
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d) | |
for { | |
r <- Ref.make(List.empty[Long]) | |
_ <- demoBatch(r) | |
_ <- adjust(12.minutes) | |
_ <- UIO.effectTotal(Thread.sleep(2000)) | |
l <- r.get | |
} yield l | |
}.provideLayer(testEnvironment) | |
def main(args: Array[String]): Unit = { | |
val l = Runtime.default.unsafeRun(prog2) | |
assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}") | |
} | |
} | |
object MainTest3 { | |
/* | |
* Use a `Queue` to force synchronization between front and back fiber. | |
*/ | |
def demoBatch(q: Queue[Long]) = Batch.createBatch( | |
// access what is needed from environment | |
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t => | |
// actual effect | |
UIO.effectTotal(println(s"done at $t")) *> q.offer(t).unit | |
) } | |
) | |
val prog3 = ZIO.accessM[Clock with TestClock] { testClock => | |
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d) | |
for { | |
q <- Queue.unbounded[Long] | |
_ <- demoBatch(q) | |
_ <- adjust(12.minutes) | |
l <- q.take.zip(q.take) // NOT `takeAll`, else you don't force synchronisation two times! | |
} yield l.productIterator.toList | |
}.provideLayer(testEnvironment) | |
def main(args: Array[String]): Unit = { | |
val l = Runtime.default.unsafeRun(prog3) | |
assert(l == 5 :: 10 :: Nil, s"Got unexpected execution list: ${l}") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment