Created
November 14, 2020 23:40
-
-
Save RoryKelly/695e5f61340ac37a461a8d6b905b187a to your computer and use it in GitHub Desktop.
FlowRecorder for recording emissions from flows.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Starts subscribing / collecting the given Flow and records it's emission to verify them later | |
*/ | |
suspend fun <T> Flow<T>.recordEmissions( | |
recordingScope: CoroutineScope = GlobalScope, | |
verify: suspend SharedFlow<T>.() -> Unit | |
) { | |
val recording = shareIn(recordingScope, SharingStarted.Eagerly, Int.MAX_VALUE) | |
verify(recording) | |
} | |
suspend fun <T> SharedFlow<T>.verifyAll(vararg emissions: T, emissionTimeoutMilliseconds: Long = 1000L) { | |
verifyAllList(emissions.toList(), emissionTimeoutMilliseconds) | |
} | |
suspend fun <T> SharedFlow<T>.verifyAllList(nextEmissions: List<T>, emissionTimeoutMilliseconds: Long) { | |
val result = waitForIndex(emissionTimeoutMilliseconds, nextEmissions.size) | |
if (result == null) { | |
error( | |
"\nWaiting for : $nextEmissions \n" + | |
"Emissions so far: $replayCache" | |
) | |
} else { | |
assertThat(nextEmissions).isEqualTo(replayCache) | |
assertThat(nextEmissions).hasSameSizeAs(replayCache) | |
} | |
} | |
suspend fun <T> SharedFlow<T>.verityItemAt(emissionTimeoutMilliseconds: Long, index: Int, verifyBlock: (a: T) -> Unit) { | |
val result = waitForIndex(emissionTimeoutMilliseconds, index + 1) | |
if (result == null) { | |
error( | |
"Waiting for $index but no new emission within " + | |
"${emissionTimeoutMilliseconds}ms. Emissions so far: $replayCache" | |
) | |
} else { | |
verifyBlock(replayCache[index]) | |
} | |
} | |
suspend fun <T> SharedFlow<T>.waitForIndex( | |
emissionTimeoutMilliseconds: Long, | |
size: Int | |
): Unit? { | |
return withTimeoutOrNull(emissionTimeoutMilliseconds) { | |
while (replayCache.size < size) { | |
delay(100) | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment