Skip to content

Instantly share code, notes, and snippets.

@iamthiago
Created December 7, 2016 13:29
Show Gist options
  • Save iamthiago/f36d4cc73d64d68d6063217c84d087e8 to your computer and use it in GitHub Desktop.
Save iamthiago/f36d4cc73d64d68d6063217c84d087e8 to your computer and use it in GitHub Desktop.
import java.util.UUID
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.stream._
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Sink, Source}
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.Message
import com.github.dwhjames.awswrap.sqs.AmazonSQSScalaClient
import com.mfglabs.commons.aws.sqs.SQSStreamBuilder
import com.typesafe.config.ConfigFactory
import com.vivareal.columbus.raw.analytics.json.MyJsonProtocol._
import com.vivareal.columbus.raw.analytics.log.GelfLogger
import com.vivareal.columbus.raw.analytics.repository.{Raw, RawRepository}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.concurrent.ExecutionContext.Implicits.{global => ec}
/**
* This program is responsible for streaming messages from an Amazon SQS,
* parse it to a known model and save into our database.
*
* This is the actual pipeline:
*
* +-----------------------------+
* | |
* source ~> | saveIntoDB ~> deleteFromSqs | ~> sink
* | |
* +-----------------------------+
*
* Inside the box, we execute the `saveIntoDb` and the `deleteFromSqs` methods in parallel.
*
*/
object Boot extends App {
val config = ConfigFactory.load()
implicit val system = ActorSystem("raw-analytics")
val log = new GelfLogger[this.type]()
val decider: Supervision.Decider = {
case e: Exception =>
log.error(UUID.randomUUID(), e.getMessage, e)
Supervision.Resume
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider)
)
val url = config.getString("aws.sqs")
val parallelism = config.getInt("app.parallelism")
val awsCreds = new DefaultAWSCredentialsProviderChain()
val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(awsCreds), ec)
val builder = SQSStreamBuilder(sqs)
/**
* The [[Source]] to be used in the Stream, it:
* - Streams messages from the SQS
* - Parse the String message to JSON
* - Convert it to a Raw object
*/
val source: Source[(Message, Raw), ActorRef] = {
val toJson: Flow[Message, (Message, JValue), NotUsed] = Flow[Message].map(msg => (msg, parse(msg.getBody)))
val toRaw: Flow[(Message, JValue), (Message, Raw), NotUsed] = Flow[(Message, JValue)].map { tuple =>
val (msg, originalJson) = tuple
val messagePath: String = (originalJson \\ "Message").extract[String]
val json = parse(messagePath)
val raw = Raw(
externalId = (json \\ "externalId").extract[String],
providerId = (json \\ "feedProviderId").extract[String],
state = (json \\ "originalLocation" \\ "state").extractOpt[String],
city = (json \\ "originalLocation" \\ "city").extractOpt[String],
neighborhood = (json \\ "originalLocation" \\ "neighborhood").extractOpt[String],
street = (json \\ "originalLocation" \\ "address").extractOpt[String],
streetNumber = (json \\ "originalLocation" \\ "streetNumber").extractOpt[String],
zipCode = (json \\ "originalLocation" \\ "zipCode").extractOpt[String]
)
(msg, raw)
}
builder.receiveMessageAsStream(url, autoAck = false)
.via(toJson)
.via(toRaw)
}
/**
* The [[Sink]] checks whether the final message is a
* [[Right]] or [[Left]] of [[Raw]] and log using the [[GelfLogger]]
*
* [[Right]] means a inserted value
* [[Left]] means an updated value
*/
val sink = Sink.foreach[Either[Raw, Raw]] {
case Right(inserted) => log.info(UUID.randomUUID(), "Inserting Raw - ExternalId: {}, ProviderId: {}", inserted.externalId, inserted.providerId)
case Left(updated) => log.info(UUID.randomUUID(), "Updating Raw - ExternalId: {}, ProviderId: {}", updated.externalId, updated.providerId)
}
/**
* This [[GraphDSL]] represents our Pipelining and Parallelism process.
*
* - Receives message from the [[Source]]
* - Process it in parallel
* - Send it downstream to [[Sink]]
*/
val graph: Flow[(Message, Raw), Either[Raw, Raw], NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dispatch = builder.add(Balance[(Message, Raw)](parallelism))
val merge = builder.add(scaladsl.Merge[Either[Raw, Raw]](parallelism))
val saveIntoDb: Flow[(Message, Raw), (Message, Either[Raw, Raw]), NotUsed] = Flow[(Message, Raw)].mapAsyncUnordered(1) { tuple =>
val (msg, raw) = tuple
RawRepository.save(raw).map(either => (msg, either))
}
val deleteFromSqs: Flow[(Message, Either[Raw, Raw]), Either[Raw, Raw], NotUsed] = Flow[(Message, Either[Raw, Raw])].mapAsyncUnordered(1) { tuple =>
val (msg, either) = tuple
sqs.deleteMessage(url, msg.getReceiptHandle).map(_ => either)
}
for (i <- 0 until parallelism) {
dispatch.out(i) ~> saveIntoDb.async ~> deleteFromSqs.async ~> merge.in(i)
}
FlowShape(dispatch.in, merge.out)
})
graph.runWith(source, sink)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment