Skip to content

Instantly share code, notes, and snippets.

@ultra-taco
Last active May 21, 2019 14:50
Show Gist options
  • Save ultra-taco/a9320c42008016b4947a7de342b1c67e to your computer and use it in GitHub Desktop.
Save ultra-taco/a9320c42008016b4947a7de342b1c67e to your computer and use it in GitHub Desktop.
Parallel coroutine extensions
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlin.coroutines.CoroutineContext
suspend fun <A, B> Collection<A>.parallelMap(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B
) = map {
GlobalScope.async(context) { block(it) }
}.map { it.await() }
suspend fun <A, B> Collection<A>.parallelForEach(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B
) = map {
GlobalScope.async(context) { block(it) }
}.forEach { it.await() }
suspend fun <A, B> Collection<A>.parallelForEach(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B,
maxConcurrency: Int
) {
val jobs = ArrayList<Job>()
GlobalScope.async(context) {
forEach {
println("before job ${it}. it has ${jobs.size}")
while (jobs.size >= maxConcurrency) {
// println("yielding at ${System.currentTimeMillis()}")
yield()
}
println("starting job ${it}. it has ${jobs.size}")
val job = GlobalScope.async(context) { block(it) }
job.invokeOnCompletion {
jobs.remove(job)
}
jobs.add(job)
println("added job ${it}. it has ${jobs.size}")
}
println("Joining jobs")
jobs.toMutableList().joinAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment