Skip to content

Instantly share code, notes, and snippets.

@alexandru
Forked from heliocentrist/timeout.scala
Last active December 21, 2016 14:26
Show Gist options
  • Save alexandru/6c2ca76fcf659e3674bc21d6ec3a9146 to your computer and use it in GitHub Desktop.
Save alexandru/6c2ca76fcf659e3674bc21d6ec3a9146 to your computer and use it in GitHub Desktop.

There are two problems with this piece of code:

  1. it blocks the current thread - Monix is always defaulting to process things synchronously and you might argue that this timeoutOnSlowDownstream operator violates the principle of least surprise, but if you want the timeout to work, it must not block the thread that executes onNext - consider that on top of Javascript it's not even possible

So it is better doing this and the timeout will get triggered:

def onNext(elem: Int) = Future {
  sum += elem
  Thread.sleep(20000)
  Continue
}
  1. this operator is about protecting the source and not the downstream consumer - if you'll take a look at the DownstreamTimeoutObservable implementation, it does send the onError after 1 second.

You can use the dump operator to debug this:

val task = source.take(5)
  .map(x => 1)
  .dump("Upstream")
  .timeoutOnSlowDownstream(1.second)
  .dump("Downstream")
  .consumeWith(sumConsumer)

After running it, you first get this output (immediately):

0: Upstream-->1
0: Downstream-->1

After 1 second the error is sent downstream and the upstream gets canceled:

1: Downstream-->monix.reactive.exceptions.DownstreamTimeoutException: Downstream timeout after 1 second
2: Upstream canceled

After another 9 seconds:

Yay, got timeout!

In other words, the error may have been emitted immediately, but that future got completed after 10 seconds, because it had to wait for the result of the final onNext before onError to be finally emitted.

You see, even though in the protocol of Observer we are allowed to send an onError without back-pressuring the last onNext, the subscribers given by users (in this instance your Subscriber) are wrapped in a SafeSubscriber instance by the user facing Observable.subscribe. This is because, otherwise, the logic users give in onNext can become concurrent with the logic in onComplete or onError and that's error prone. I'd hate to explain to users how concurrency works in regards to onComplete / onError not back-pressuring the final onNext, hence it's for people that know what they are doing and that can use Observable.unsafeSubscribeFn.

Summary

  1. onError can be called without back-pressuring onNext, however SafeSubscriber is wrapping user-facing subscribers because otherwise it would lead to concurrency issues, which means that your subscriber will receive that message after 20 seconds (when the last onNext is complete`)
  2. the timeoutOnSlowDownstream doesn't insert an asynchronous boundary - I'm not sure if this was a good decision, but it's consistent with Monix's design, because async boundaries are expensive and should be explicit

I would love to get your thoughts on it.

import monix.reactive._
import monix.reactive.exceptions._
import monix.reactive.observers._
import monix.execution.Scheduler.Implicits.{global => scheduler}
import monix.eval._
import monix.execution._
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.cancelables._
import scala.util.{Try, Success, Failure}
import scala.concurrent.Future
def fail(s: String) = System.err.println(s)
val sumConsumer: Consumer[Int,Long] =
new Consumer[Int,Long] {
def createSubscriber(cb: Callback[Long], s: Scheduler) = {
val out = new Subscriber[Int] {
implicit val scheduler = s
private var sum = 0L
def onNext(elem: Int) = {
sum += elem
Thread.sleep(20000)
Continue
}
def onComplete(): Unit = {
// We are done so we can signal the final result
cb.onSuccess(sum)
}
def onError(ex: Throwable): Unit = {
// Error happened, so we signal the error
cb.onError(ex)
}
}
// Returning a tuple of our subscriber and a dummy
// AssignableCancelable because we don't indent to use it
(out, AssignableCancelable.dummy)
}
}
val source = Observable.interval(1.second)
val task = source.take(5)
.map(x => 1)
.timeoutOnSlowDownstream(1.second)
.consumeWith(sumConsumer)
Await.ready(task.runAsync, 10.seconds).value.get match {
case Failure(ex: DownstreamTimeoutException) => fail("Yay, got timeout!")
case Failure(e) => fail("Wrong exception type: " + e)
case Success(v) => fail("Expected a timeout exception, got: " + v)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment