Skip to content

Instantly share code, notes, and snippets.

@iamthiago
Created July 19, 2017 21:12
Show Gist options
  • Save iamthiago/4fb8684bb2409995f781151c42e349ac to your computer and use it in GitHub Desktop.
Save iamthiago/4fb8684bb2409995f781151c42e349ac to your computer and use it in GitHub Desktop.
Aws Kinesis Flow to be used in Akka Streams
import akka.stream._
import akka.stream.stage._
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.{PutRecordsRequest, PutRecordsRequestEntry, PutRecordsResult}
import scala.collection.JavaConverters._
class AwsKinesisFlowStage(streamName: String, kinesisClient: AmazonKinesisAsync) extends GraphStage[FlowShape[Seq[PutRecordsRequestEntry], PutRecordsResult]] {
val in: Inlet[Seq[PutRecordsRequestEntry]] = Inlet[Seq[PutRecordsRequestEntry]]("KinesisFlow.in")
val out: Outlet[PutRecordsResult] = Outlet[PutRecordsResult]("KinesisFlow.out")
override def shape: FlowShape[Seq[PutRecordsRequestEntry], PutRecordsResult] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private var isMessageInFlight = false
private val failureCallback = getAsyncCallback[Throwable](handleFailure)
private val successCallback = getAsyncCallback[PutRecordsResult](handleSuccess)
private def handleFailure(ex: Throwable): Unit = failStage(ex)
private def handleSuccess(result: PutRecordsResult): Unit = {
log.debug(s"Put Result $result")
isMessageInFlight = false
if (!isClosed(out)) push(out, result)
}
private val asyncHandler = new AsyncHandler[PutRecordsRequest, PutRecordsResult] {
override def onError(exception: Exception): Unit =
failureCallback.invoke(exception)
override def onSuccess(request: PutRecordsRequest, result: PutRecordsResult): Unit =
successCallback.invoke(result)
}
override def onPush(): Unit = {
isMessageInFlight = true
val elem = grab(in).asJavaCollection
val request = new PutRecordsRequest().withStreamName(streamName).withRecords(elem)
kinesisClient.putRecordsAsync(request, asyncHandler)
tryPull(in)
}
override def onPull(): Unit = {
if (isClosed(in) && !isMessageInFlight) completeStage()
if (!hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (!isMessageInFlight) completeStage()
}
setHandlers(in, out, this)
}
}
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.{PutRecordsRequestEntry, PutRecordsResult}
object KinesisPublisher {
def flow(streamName: String)(implicit kinesisClient: AmazonKinesisAsync): Flow[Seq[PutRecordsRequestEntry], PutRecordsResult, NotUsed] = {
Flow.fromGraph(new AwsKinesisFlowStage(streamName, kinesisClient))
}
def sink(streamName: String)(implicit kinesisClient: AmazonKinesisAsync) = {
flow(streamName).toMat(Sink.ignore)(Keep.right)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment