-
-
Save joeyslalom/b084fc1febcca9fab440f6a9f8e64ce7 to your computer and use it in GitHub Desktop.
SQS Consumer using Kotlin coroutines and pool of workers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import kotlinx.coroutines.CancellationException | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.isActive | |
import kotlinx.coroutines.yield | |
import java.lang.Thread.currentThread | |
suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) { | |
while (isActive) { | |
try { | |
block() | |
yield() | |
} catch (ex: CancellationException) { | |
println("coroutine on ${currentThread().name} cancelled") | |
} catch (ex: Exception) { | |
println("${currentThread().name} failed with {$ex}. Retrying...") | |
ex.printStackTrace() | |
} | |
} | |
println("coroutine on ${currentThread().name} exiting") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import com.jivimberg.sqs.SQS_URL | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.SendChannel | |
import kotlinx.coroutines.future.await | |
import software.amazon.awssdk.regions.Region | |
import software.amazon.awssdk.services.sqs.SqsAsyncClient | |
import software.amazon.awssdk.services.sqs.model.Message | |
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest | |
import kotlin.coroutines.CoroutineContext | |
class SqsSampleConsumerChannels( | |
private val sqs: SqsAsyncClient | |
) : CoroutineScope { | |
private val supervisorJob = SupervisorJob() | |
override val coroutineContext: CoroutineContext | |
get() = Dispatchers.IO + supervisorJob | |
fun start() = launch { | |
val messageChannel = Channel<Message>() | |
repeat(N_WORKERS) { launchWorker(messageChannel) } | |
launchMsgReceiver(messageChannel) | |
} | |
fun stop() { | |
supervisorJob.cancel() | |
} | |
private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch { | |
repeatUntilCancelled { | |
val receiveRequest = ReceiveMessageRequest.builder() | |
.queueUrl(SQS_URL) | |
.waitTimeSeconds(20) | |
.maxNumberOfMessages(10) | |
.build() | |
val messages = sqs.receiveMessage(receiveRequest).await().messages() | |
println("${Thread.currentThread().name} Retrieved ${messages.size} messages") | |
messages.forEach { | |
channel.send(it) | |
} | |
} | |
} | |
private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch { | |
repeatUntilCancelled { | |
for (msg in channel) { | |
try { | |
processMsg(msg) | |
deleteMessage(msg) | |
} catch (ex: Exception) { | |
println("${Thread.currentThread().name} exception trying to process message ${msg.body()}") | |
ex.printStackTrace() | |
changeVisibility(msg) | |
} | |
} | |
} | |
} | |
private suspend fun processMsg(message: Message) { | |
println("${Thread.currentThread().name} Started processing message: ${message.body()}") | |
delay((1000L..2000L).random()) | |
println("${Thread.currentThread().name} Finished processing of message: ${message.body()}") | |
} | |
private suspend fun deleteMessage(message: Message) { | |
sqs.deleteMessage { req -> | |
req.queueUrl(SQS_URL) | |
req.receiptHandle(message.receiptHandle()) | |
}.await() | |
println("${Thread.currentThread().name} Message deleted: ${message.body()}") | |
} | |
private suspend fun changeVisibility(message: Message) { | |
sqs.changeMessageVisibility { req -> | |
req.queueUrl(SQS_URL) | |
req.receiptHandle(message.receiptHandle()) | |
req.visibilityTimeout(10) | |
}.await() | |
println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}") | |
} | |
} | |
fun main() = runBlocking { | |
println("${Thread.currentThread().name} Starting program") | |
val sqs = SqsAsyncClient.builder() | |
.region(Region.US_EAST_1) | |
.build() | |
val consumer = SqsSampleConsumerChannels(sqs) | |
consumer.start() | |
delay(30000) | |
consumer.stop() | |
} | |
private const val N_WORKERS = 4 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.runBlocking | |
import software.amazon.awssdk.regions.Region | |
import software.amazon.awssdk.services.sqs.SqsClient | |
import software.amazon.awssdk.services.sqs.model.SendMessageRequest | |
fun main() = runBlocking { | |
val sqs = SqsClient.builder() | |
.region(Region.US_EAST_1) | |
.build() | |
var id = 0 | |
while (true) { | |
id++ | |
val sendMsgRequest = SendMessageRequest.builder() | |
.queueUrl(SQS_URL) | |
.messageBody("hello world $id") | |
.build() | |
sqs.sendMessage(sendMsgRequest) | |
println("Message sent with id: $id") | |
delay((1000L..5000L).random()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://jivimberg.io/blog/2019/02/23/sqs-consumer-using-kotlin-coroutines/