Last active
April 29, 2021 07:53
-
-
Save nanox77/76ceac56da328cf31a02534a2158f12c to your computer and use it in GitHub Desktop.
Cache implementation for Single element using RxJava.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Single | |
import io.reactivex.SingleObserver | |
import io.reactivex.SingleSource | |
import io.reactivex.SingleTransformer | |
import io.reactivex.functions.Consumer | |
import org.joda.time.DateTime | |
import java.util.concurrent.TimeUnit | |
class SingleRxCache<T>(private val timeout: Long, private val unit: TimeUnit) : SingleTransformer<T, T> { | |
override fun apply(upstream: Single<T>): SingleSource<T> { | |
val lastElementSeen = LastElementSeen<T>(timeout, unit) | |
return LastElementSeenSingle(upstream.doOnSuccess(lastElementSeen), lastElementSeen) | |
} | |
} | |
class LastElementSeenSingle<T>(private var upstream: Single<T>, private val lastElementSeen: LastElementSeen<T>) : SingleSource<T> { | |
override fun subscribe(observer: SingleObserver<in T>) { | |
if (lastElementSeen.isValid()) { | |
lastElementSeen.value?.let(observer::onSuccess) ?: upstream.subscribe(observer) | |
} else { | |
upstream.subscribe(observer) | |
} | |
} | |
} | |
class LastElementSeen<T>(private val timeout: Long, private val unit: TimeUnit) : Consumer<T> { | |
private var lastEmissionTimestamp: Long = 0 | |
var value: T? = null | |
override fun accept(latest: T) { | |
lastEmissionTimestamp = System.currentTimeMillis() | |
value = latest | |
} | |
fun isValid(): Boolean { | |
// If you don't use JodaTime: System.currentTimeMillis() - lastEmissionTimestamp <= unit.toMillis(timeout) | |
return value?.let { DateTime.now().minus(lastEmissionTimestamp).isBefore(unit.toMillis(timeout)) } ?: false | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment