Created
May 12, 2020 21:22
-
-
Save fkorotkov/38586fd3d42b15381fd9b7bb6c99dcfd to your computer and use it in GitHub Desktop.
Code samples for a blog post
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BatchLoader<ID, T>( | |
poolSize: Int = 8, | |
private val keyBatchSizeLimit: Int = 100, | |
private val delegateLoader: Loader<ID, T> | |
) : Loader<ID, T>, CoroutineScope { | |
override val coroutineContext: CoroutineContext = Executors.newFixedThreadPool(poolSize).asCoroutineDispatcher() | |
private val requests = Channel<LoadRequest<ID, T>>(Channel.UNLIMITED) // UNLIMITED allows to queue requests | |
private inner class RequestWorker { | |
/* ... */ | |
} | |
init { | |
repeat(poolSize) { | |
launch(coroutineContext) { | |
RequestWorker().start() | |
} | |
} | |
} | |
override suspend fun loadByIds(ids: Set<ID>): Map<ID, T> { | |
val request = LoadRequest<ID, T>(ids) | |
requests.send(request) | |
return request.result.await() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BatchLoader<ID, T>( | |
poolSize: Int = 8, | |
private val keyBatchSizeLimit: Int = 100, | |
private val delegateLoader: Loader<ID, T> | |
) : Loader<ID, T> { | |
// ... | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private data class LoadRequest<ID, T>( | |
val ids: Set<ID>, | |
val result: CompletableDeferred<Map<ID, T>> = CompletableDeferred<Map<ID, T>>() | |
) | |
class BatchLoader<ID, T>( | |
/* ... */ | |
) : Loader<ID, T> { | |
private val requests = Channel<LoadRequest<ID, T>>(Channel.UNLIMITED) // UNLIMITED allows to queue requests | |
override suspend fun loadByIds(ids: Set<ID>): Map<ID, T> { | |
val request = LoadRequest<ID, T>(ids) // create a request | |
requests.send(request) // queue the request | |
return request.result.await() // await for a request to be fulfilled | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface Loader<ID, T> { | |
suspend fun loadByIds(ids: Set<ID>): Map<ID, T> | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private inner class RequestWorker { | |
suspend fun start() { | |
while (true) { | |
val requestsToProcess = LinkedList<LoadRequest<ID, T>>() | |
val idsToLoad = HashSet<ID>() | |
val request = requests.receive() // suspend until channel has a request | |
requestsToProcess.add(request) | |
idsToLoad.addAll(request.ids) | |
// get as many queued requests as possible | |
while (idsToLoad.size < keyBatchSizeLimit) { | |
// trick to see if there are more requests to process | |
val additionalRequest = select<LoadRequest<ID, T>?> { | |
requests.onReceive { it } | |
onTimeout(0) { null } | |
} ?: break | |
requestsToProcess.add(additionalRequest) | |
idsToLoad.addAll(additionalRequest.ids) | |
} | |
try { | |
val loadedObjects = delegateLoader.loadByIds(idsToLoad) | |
requestsToProcess.forEach { request -> | |
request.result.complete( | |
loadedObjects.filterKeys { request.ids.contains(it) } | |
) | |
} | |
} catch (e: Exception) { | |
requestsToProcess.forEach { it.result.completeExceptionally(e) } | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment