Last active
May 4, 2023 13:33
-
-
Save L-Briand/d6795aa6960aa825a648076b7c08a413 to your computer and use it in GitHub Desktop.
Observable pattern with kotlinx coroutine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.ClosedReceiveChannelException | |
import kotlinx.coroutines.isActive | |
import kotlinx.coroutines.launch | |
import kotlinx.coroutines.runBlocking | |
import kotlinx.coroutines.sync.Mutex | |
import kotlinx.coroutines.sync.withLock | |
import java.lang.ref.WeakReference | |
import java.util.concurrent.atomic.AtomicBoolean | |
import kotlin.coroutines.CoroutineContext | |
typealias Observer<T> = suspend (old: T, new: T) -> Unit | |
fun <T> observe(block: suspend (T) -> Unit): Observer<T> = { _, new -> block(new) } | |
fun <T> observeUpdate(block: suspend (T, T) -> Unit): Observer<T> = block | |
/** | |
* A coroutine safe observable. | |
* | |
* To manipulate the underlying value uses [update], [get] or [getUnsafe]. | |
* You can also [add], [remove] or [clear] [Observer]s. | |
* | |
*/ | |
class Observable<T> private constructor(defaultValue: T) { | |
companion object { | |
fun <T> make( | |
defaultValue: T, | |
notificationDispatcher: CoroutineContext = Dispatchers.IO, | |
): Observable<T> { | |
val observable = Observable(defaultValue) | |
val weak = WeakReference(observable) | |
notificationTask(weak, notificationDispatcher) | |
return observable | |
} | |
/** | |
* Launch a coroutine calling observers when a new value is set. | |
* It do not reference the observable. This way, when the observable is garbage collected, | |
* the notification tasks ends. | |
*/ | |
private fun <T> notificationTask( | |
weak: WeakReference<Observable<T>>, | |
dispatcher: CoroutineContext, | |
) { | |
CoroutineScope(dispatcher).launch { | |
while (true) { | |
// Try to get new value from channel | |
val new = try { | |
weak.get()?.valueChannel?.receive() ?: break | |
} catch (e: ClosedReceiveChannelException) { | |
break | |
} | |
if (!isActive) break | |
// If a value is found, notify attached observers | |
weak.get()?.also { obs -> | |
val old = obs._value | |
val observers = obs.observersMutex.withLock { obs.observers.toTypedArray() } | |
for (observer in observers) | |
runCatching { observer(old, new) }.onFailure { it.printStackTrace() } | |
obs._value = new | |
} ?: break | |
} | |
} | |
} | |
} | |
/** Hold values needed to be notified. */ | |
private var valueChannel: Channel<T> = Channel(Channel.BUFFERED) | |
private var _value: T = defaultValue | |
/** Hold the latest set value. */ | |
var value: T | |
get() = _value | |
set(value) = update(value) ?: Unit | |
fun update(value: T) = valueChannel.trySend(value).getOrNull() | |
/** List of all listeners that need to be notified when a new value is set. */ | |
private val observers = mutableListOf<Observer<T>>() | |
private val observersMutex = Mutex() | |
/** Blocks until the the observer is added */ | |
fun addBlocking(observer: Observer<T>, notifyCurrentValue: Boolean = false) = | |
runBlocking { add(observer, notifyCurrentValue) } | |
/** Add an observer */ | |
suspend fun add(observer: Observer<T>, notifyCurrentValue: Boolean = false) { | |
observersMutex.withLock { observers.add(observer) } | |
if (notifyCurrentValue) observer.invoke(value, value) | |
} | |
/** Blocks until the the observer is removed */ | |
fun removeBlocking(observer: Observer<T>) = runBlocking { remove(observer) } | |
/** Remove an observer */ | |
suspend fun remove(observer: Observer<T>) = observersMutex.withLock { observers.remove(observer) } | |
/** Remove all observers */ | |
suspend fun clear() = observersMutex.withLock { observers.clear() } | |
private val isClose = AtomicBoolean(false) | |
fun close() { | |
isClose.set(true) | |
valueChannel.close() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.runBlocking | |
/** | |
* Simplify the process of listening serially with a listener. | |
* | |
* Generally when you want to listen to events in coroutines you have to : | |
* - Add a listener to an object. | |
* - This listener publish events to a channel. | |
* - You launch a coroutine and listen to the channel events. | |
* | |
*/ | |
interface ChannelListener<T, L> { | |
/** The channel where the [listener] publish events */ | |
val channel: Channel<T> | |
/** The listener that will receive events. */ | |
val listener: L | |
/** | |
* Some kind of way to close the events feed. | |
* | |
* By default it only close the channel but implementations | |
* can use it to remove listener for event producer as well. | |
*/ | |
fun close(): Boolean = channel.close() | |
companion object { | |
/** | |
* Generic way to create a ChannelListener. | |
* | |
* Usage : | |
* ```kotlin | |
* interface MyListener { | |
* fun callback1(value: String) | |
* fun end() | |
* } | |
* | |
* val cl = ChannelListener.makeGeneric<String?, MyListener> { channel -> | |
* object : MyListener { | |
* override fun callback1(value: String) { | |
* channel.trySend(value) | |
* } | |
* override fun end() { | |
* channel.trySend(null) | |
* } | |
* } | |
* } | |
* | |
* addListener(cl.listener) | |
* | |
* runBlocking { | |
* while (true) { | |
* when(val newEvent = cl.channel.receive()) { | |
* null -> break | |
* else -> println(newEvent) | |
* } | |
* } | |
* } | |
* | |
* removeListener(cl.listener) | |
* ``` | |
*/ | |
inline fun <reified T, reified L> makeGeneric( | |
capacity: Int = Channel.BUFFERED, | |
crossinline block: (Channel<T>) -> L | |
): ChannelListener<T, L> = object : ChannelListener<T, L> { | |
override val channel: Channel<T> = Channel(capacity) | |
override val listener: L = block(channel) | |
} | |
} | |
} | |
/** | |
* Create a channel listener from an Observable. | |
* Each time a notification occurs, the channel is feed with the value. | |
*/ | |
suspend inline fun <reified T> Observable<T>.makeChannelListener( | |
notifyCurrentValue: Boolean = false, | |
capacity: Int = Channel.BUFFERED, | |
): ChannelListener<T, Observer<T>> { | |
val result = object : ChannelListener<T, Observer<T>> { | |
override val channel: Channel<T> = Channel(capacity) | |
override val listener: Observer<T> = observe { channel.trySend(it) } | |
override fun close(): Boolean { | |
val r1 = runBlocking { remove(listener) } | |
val r2 = channel.close() | |
return r1 && r2 | |
} | |
} | |
add(result.listener, notifyCurrentValue) | |
return result | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment