Skip to content

Instantly share code, notes, and snippets.

@tlux
Created February 18, 2025 14:57
Show Gist options
  • Save tlux/dea69a169b8a345e7217b5661bf7918a to your computer and use it in GitHub Desktop.
Save tlux/dea69a169b8a345e7217b5661bf7918a to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.time.Duration
fun <T> Flow<T>.chunkedThrottled(interval: Duration, bufferSize: Int = 1024): Flow<List<T>> = channelFlow {
val mutex = Mutex()
val buffer = ArrayList<T>(bufferSize)
coroutineScope {
launch {
while (isActive) {
mutex.withLock {
if (buffer.isNotEmpty()) {
send(buffer.toList())
buffer.clear()
}
}
delay(interval)
}
}
collect {
mutex.withLock {
buffer.add(it)
if (buffer.size == bufferSize) {
send(buffer.toList())
buffer.clear()
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment