Created
December 8, 2016 14:29
-
-
Save heliocentrist/dd204493cb9abe97c0fa9db1b491ba0d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sumConsumer: Consumer[Int,Long] = | |
new Consumer[Int,Long] { | |
def createSubscriber(cb: Callback[Long], s: Scheduler) = { | |
val out = new Subscriber.Sync[Int] { | |
implicit val scheduler = s | |
private var sum = 0L | |
def onNext(elem: Int): Continue = { | |
sum += elem | |
Thread.sleep(20000) | |
Continue | |
} | |
def onComplete(): Unit = { | |
// We are done so we can signal the final result | |
cb.onSuccess(sum) | |
} | |
def onError(ex: Throwable): Unit = { | |
// Error happened, so we signal the error | |
cb.onError(ex) | |
} | |
} | |
// Returning a tuple of our subscriber and a dummy | |
// AssignableCancelable because we don't indent to use it | |
(out, AssignableCancelable.dummy) | |
} | |
} | |
val source = Observable.interval(1 second) | |
val task = source.take(5) | |
.map(x => 1) | |
.timeoutOnSlowDownstream(1 second) | |
.runWith(sumConsumer) | |
Await.ready(task.runAsync, 10 seconds).value.get match { | |
case Failure(ex: DownstreamTimeoutException) => fail("Yay, got timeout!") | |
case Failure(e) => fail("Wrong exception type: " + e) | |
case Success(v) => fail("Expected a timeout exception, got: " + v) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment