-
-
Save Dadoufi/139296fcce6a59e7d77bb30bdd01cb0e to your computer and use it in GitHub Desktop.
RxJava2 operator which adjusts items of an LCE (loading-content-error) stream so that L (loading) and C/E (content/error) items are not emitted close to each other. This is a reactive state alternative to Android's ContentLoadingProgressBar: https://developer.android.com/reference/android/support/v4/widget/ContentLoadingProgressBar.html
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dimsuz.lcefilterdelay | |
import io.reactivex.Observable | |
import io.reactivex.ObservableSource | |
import io.reactivex.Observer | |
import io.reactivex.Scheduler | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.exceptions.Exceptions | |
import io.reactivex.internal.disposables.DisposableHelper | |
import io.reactivex.internal.fuseable.SimpleQueue | |
import io.reactivex.internal.queue.SpscLinkedArrayQueue | |
import io.reactivex.plugins.RxJavaPlugins | |
import io.reactivex.schedulers.Schedulers | |
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.atomic.AtomicBoolean | |
import java.util.concurrent.atomic.AtomicLong | |
import java.util.concurrent.atomic.AtomicReference | |
/** | |
* Adjusts items of an LCE (loading-content-error) stream so that L (loading) and C/E (content/error) items | |
* are not emitted close to each other. | |
* | |
* Whenever a source observable emits an `L-item` (determined by [loadingItemPredicate]) there are two possible | |
* scenarios: | |
* | |
* * if this `L-item` will be followed by a `C/E-item` such that time between `L` and `C/E` items is less | |
* than [acceptableContentItemDelay] then `L-item` will be swallowed and only `C/E-item` will be emitted. | |
* * if on the other hand a source observable emits an `L-item` and then won't emit any `C/E-item` | |
* in [acceptableContentItemDelay] interval, then `L-item` is not swallowed and the next `C/E-item` will be delayed | |
* by at least [minDelayFromLoadingItem] interval (if it comes sooner). | |
* This is done to avoid flickering between `L => C/E` states in case `C/E-item` will be emitted close to | |
* the delayed L-event. I.e. a loading state will be shown to the user for [minDelayFromLoadingItem] | |
* period of time (at a minimum) | |
* | |
* The effect of applying this operator is that the stream will be shifted by at least [acceptableContentItemDelay], | |
* because each `L-item` will always be held back until [acceptableContentItemDelay] has passed, so that it can be | |
* determined which of above two routes should be taken | |
* | |
* Also note that when `L-item` is emitted: | |
* * All successive `L-items` emitted during [acceptableContentItemDelay] interval will be ignored. | |
* After next `C/E-item` will be emitted, later `L-item`s will be treated according to above rules | |
* * All successive `C/E-items` emitted during [acceptableContentItemDelay] interval will be queued and emitted | |
* immediately after [minDelayFromLoadingItem] delay | |
*/ | |
fun <T> Observable<T>.lceFilterDelay( | |
loadingItemPredicate: (T) -> Boolean, | |
acceptableContentItemDelay: Long, | |
minDelayFromLoadingItem: Long, | |
scheduler: Scheduler = Schedulers.computation() | |
): Observable<T> { | |
return RxJavaPlugins.onAssembly( | |
ObservableLceFilterDelay( | |
this, | |
loadingItemPredicate, | |
acceptableContentItemDelay, | |
minDelayFromLoadingItem, | |
scheduler | |
) | |
) | |
} | |
private class ObservableLceFilterDelay<T>( | |
private val source: ObservableSource<T>, | |
private val loadingPredicate: (T) -> Boolean, | |
private val acceptableContentDelay: Long, | |
private val minDelayFromLoadingState: Long, | |
private val scheduler: Scheduler | |
) : Observable<T>() { | |
override fun subscribeActual(s: Observer<in T>) { | |
return source.subscribe( | |
FilterDelayObserver( | |
s, | |
loadingPredicate, | |
acceptableContentDelay, | |
minDelayFromLoadingState, | |
scheduler.createWorker() | |
) | |
) | |
} | |
class FilterDelayObserver<T>( | |
private val actual: Observer<in T>, | |
private val loadingPredicate: (T) -> Boolean, | |
private val acceptableContentDelay: Long, | |
private val minDelayFromLoadingState: Long, | |
private val w: Scheduler.Worker | |
) : Observer<T>, Disposable { | |
private var d: Disposable? = null | |
private val queue = SpscLinkedArrayQueue<T>(12) | |
private val error = AtomicReference<Throwable?>(null) | |
private val completed = AtomicBoolean(false) | |
private val disposed = AtomicBoolean(false) | |
// will be non-null only right after emitting load event and before emitting | |
// the next content event | |
private val loadEmitTimestamp = AtomicLong(0) | |
// NOTE: any stream item delays should follow this pattern: | |
// put `item` in queue + schedule `QueueDrain()` runnable. | |
// This ensures that onComplete are called after queue processing | |
// and that stream won't complete until queue is drained | |
override fun onSubscribe(d: Disposable) { | |
if (DisposableHelper.validate(this.d, d)) { | |
this.d = d | |
actual.onSubscribe(this) | |
} | |
} | |
override fun onNext(t: T) { | |
if (loadingPredicate(t)) { | |
if (queue.isEmpty) { | |
// enqueue L-item to later see if C-item has arrived in acceptable time interval | |
queue.offer(t) | |
w.schedule(QueueDrain(), acceptableContentDelay, TimeUnit.MILLISECONDS) | |
} else { | |
// all L-events during draining are ignored | |
// (I have no skills to handle all that - and it's not really needed to | |
// have multiple load events in the UI anyway) | |
} | |
} else { // received C-item | |
if (queue.isEmpty) { | |
if (loadEmitTimestamp.get() == 0L) { | |
// no queuing is going on, no L-events were spotted on the horizon, | |
// continue as usual | |
actual.onNext(t) | |
} else { | |
// L-event has been emitted just before this C-item, | |
// schedule it for processing later with minimal delay required: | |
// must come not sooner than minDelayFromLoadingState after L-item | |
val ts = loadEmitTimestamp.getAndSet(0) | |
queue.offer(t) | |
val delay = minDelayFromLoadingState - (System.currentTimeMillis() - ts) | |
w.schedule(QueueDrain(), maxOf(delay, 0), TimeUnit.MILLISECONDS) | |
} | |
} else { | |
// queue draining is scheduled or is in progress, enqueue this item too | |
queue.offer(t) | |
} | |
} | |
} | |
override fun onError(e: Throwable) { | |
// source observable's onComplete() or onError() might | |
// be called while we are busy processing enqueued and delayed items, | |
// if so - delay completion until queue is drained | |
if (queue.isEmpty) { | |
actualError(e) | |
} else { | |
error.set(e) | |
} | |
} | |
override fun onComplete() { | |
// source observable's onComplete() or onError() might | |
// be called while we are busy processing enqueued and delayed items, | |
// if so - delay completion until queue is drained | |
if (queue.isEmpty) { | |
actualComplete() | |
} else { | |
completed.set(true) | |
} | |
} | |
private fun actualComplete() { | |
try { | |
actual.onComplete() | |
} finally { | |
dispose() | |
} | |
} | |
private fun actualError(e: Throwable) { | |
try { | |
actual.onError(e) | |
} finally { | |
dispose() | |
} | |
} | |
override fun dispose() { | |
disposed.set(true) | |
d?.dispose() | |
w.dispose() | |
} | |
override fun isDisposed(): Boolean { | |
return w.isDisposed | |
} | |
private inner class QueueDrain : Runnable { | |
override fun run() { | |
while (true) { | |
if (disposed.get()) { | |
queue.clear() | |
return | |
} | |
val t: T? = queue.safePoll() | |
val empty = t == null | |
if (empty) { | |
// if source observable's onComplete/onError were received during | |
// queue processing, now is a good time to act on them - after it finished | |
if (completed.get()) { | |
actualComplete() | |
} else { | |
error.get()?.also { actualError(it) } | |
} | |
return | |
} else { | |
if (loadingPredicate(t!!)) { | |
// check if L-item and next C-item are close enough: if yes, skip L | |
val next: T? = queue.peek() | |
check(next == null || !loadingPredicate(next)) { | |
"loading events must not be enqueued while waiting for content event!" | |
} | |
val emitLoad = next == null | |
if (emitLoad) { | |
// no C-item, so this L-item goes down the stream | |
// timestamp will be used to determine by how much to delay next C-item | |
loadEmitTimestamp.set(System.currentTimeMillis()) | |
actual.onNext(t) | |
} // else there's a C-item and it's close, will emit it on next loop iteration | |
} else { | |
actual.onNext(t) | |
} | |
} | |
} | |
} | |
} | |
private fun SimpleQueue<T>.safePoll(): T? { | |
return try { | |
this.poll() | |
} catch (e: Throwable) { | |
Exceptions.throwIfFatal(e) | |
dispose() | |
queue.clear() | |
actual.onError(e) | |
null | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment