Last active
September 12, 2019 07:00
-
-
Save joaocsousa/d93091551338dcf925b4bac78ac5211d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// how to use | |
SafeZip.zip(single1, single2, ::merge).subscribe(...) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SafeZip { | |
companion object { | |
@CheckReturnValue | |
@SchedulerSupport(SchedulerSupport.NONE) | |
fun <T, U, R> zip(single1: Single<T>, single2: Single<U>, zipper: (T, U) -> R): Single<R> { | |
val errorHandler = SingleErrorHandler() | |
return Single.zip( | |
single1.toErrorSafeSingle(errorHandler), | |
single2.toErrorSafeSingle(errorHandler), | |
BiFunction<T, U, R> { t, u -> zipper(t, u) }) | |
} | |
private fun <T> Single<T>.toErrorSafeSingle(errorHandler: SingleErrorHandler) = ErrorSafeSingle(this, errorHandler) | |
} | |
} | |
private class SingleErrorHandler { | |
var disposed = false | |
fun <V> setDisposed(error: Throwable): Single<V?> { | |
disposed = true | |
return Single.error(error) | |
} | |
} | |
private class ErrorSafeSingle<T>( | |
private val delegate: Single<T>, | |
private val errorHandler: SingleErrorHandler | |
) : Single<T>() { | |
override fun subscribeActual(observer: SingleObserver<in T>) = | |
delegate.onErrorResumeNext { errorHandler.setDisposed(it) } | |
.subscribe(ErrorSafeSingleObserver(observer, errorHandler)) | |
} | |
private class ErrorSafeSingleObserver<T>( | |
private val source: SingleObserver<in T>, | |
private val errorHandler: SingleErrorHandler | |
) : SingleObserver<T> { | |
override fun onSuccess(t: T) = source.onSuccess(t) | |
override fun onSubscribe(d: Disposable) = source.onSubscribe(d) | |
override fun onError(e: Throwable) = | |
synchronized(errorHandler.disposed) { | |
if (!errorHandler.disposed) source.onError(e) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment