Skip to content

Instantly share code, notes, and snippets.

@iamthiago
Created December 7, 2016 13:29

Revisions

  1. Thiago Pereira created this gist Dec 7, 2016.
    140 changes: 140 additions & 0 deletions SQS.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,140 @@
    import java.util.UUID

    import akka.NotUsed
    import akka.actor.{ActorRef, ActorSystem}
    import akka.stream._
    import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Sink, Source}
    import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    import com.amazonaws.services.sqs.AmazonSQSAsyncClient
    import com.amazonaws.services.sqs.model.Message
    import com.github.dwhjames.awswrap.sqs.AmazonSQSScalaClient
    import com.mfglabs.commons.aws.sqs.SQSStreamBuilder
    import com.typesafe.config.ConfigFactory
    import com.vivareal.columbus.raw.analytics.json.MyJsonProtocol._
    import com.vivareal.columbus.raw.analytics.log.GelfLogger
    import com.vivareal.columbus.raw.analytics.repository.{Raw, RawRepository}
    import org.json4s._
    import org.json4s.jackson.JsonMethods._

    import scala.concurrent.ExecutionContext.Implicits.{global => ec}

    /**
    * This program is responsible for streaming messages from an Amazon SQS,
    * parse it to a known model and save into our database.
    *
    * This is the actual pipeline:
    *
    * +-----------------------------+
    * | |
    * source ~> | saveIntoDB ~> deleteFromSqs | ~> sink
    * | |
    * +-----------------------------+
    *
    * Inside the box, we execute the `saveIntoDb` and the `deleteFromSqs` methods in parallel.
    *
    */
    object Boot extends App {

    val config = ConfigFactory.load()

    implicit val system = ActorSystem("raw-analytics")

    val log = new GelfLogger[this.type]()

    val decider: Supervision.Decider = {
    case e: Exception =>
    log.error(UUID.randomUUID(), e.getMessage, e)
    Supervision.Resume
    }

    implicit val materializer = ActorMaterializer(
    ActorMaterializerSettings(system).withSupervisionStrategy(decider)
    )

    val url = config.getString("aws.sqs")
    val parallelism = config.getInt("app.parallelism")

    val awsCreds = new DefaultAWSCredentialsProviderChain()
    val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(awsCreds), ec)
    val builder = SQSStreamBuilder(sqs)

    /**
    * The [[Source]] to be used in the Stream, it:
    * - Streams messages from the SQS
    * - Parse the String message to JSON
    * - Convert it to a Raw object
    */
    val source: Source[(Message, Raw), ActorRef] = {

    val toJson: Flow[Message, (Message, JValue), NotUsed] = Flow[Message].map(msg => (msg, parse(msg.getBody)))

    val toRaw: Flow[(Message, JValue), (Message, Raw), NotUsed] = Flow[(Message, JValue)].map { tuple =>

    val (msg, originalJson) = tuple
    val messagePath: String = (originalJson \\ "Message").extract[String]
    val json = parse(messagePath)

    val raw = Raw(
    externalId = (json \\ "externalId").extract[String],
    providerId = (json \\ "feedProviderId").extract[String],
    state = (json \\ "originalLocation" \\ "state").extractOpt[String],
    city = (json \\ "originalLocation" \\ "city").extractOpt[String],
    neighborhood = (json \\ "originalLocation" \\ "neighborhood").extractOpt[String],
    street = (json \\ "originalLocation" \\ "address").extractOpt[String],
    streetNumber = (json \\ "originalLocation" \\ "streetNumber").extractOpt[String],
    zipCode = (json \\ "originalLocation" \\ "zipCode").extractOpt[String]
    )

    (msg, raw)
    }

    builder.receiveMessageAsStream(url, autoAck = false)
    .via(toJson)
    .via(toRaw)
    }

    /**
    * The [[Sink]] checks whether the final message is a
    * [[Right]] or [[Left]] of [[Raw]] and log using the [[GelfLogger]]
    *
    * [[Right]] means a inserted value
    * [[Left]] means an updated value
    */
    val sink = Sink.foreach[Either[Raw, Raw]] {
    case Right(inserted) => log.info(UUID.randomUUID(), "Inserting Raw - ExternalId: {}, ProviderId: {}", inserted.externalId, inserted.providerId)
    case Left(updated) => log.info(UUID.randomUUID(), "Updating Raw - ExternalId: {}, ProviderId: {}", updated.externalId, updated.providerId)
    }

    /**
    * This [[GraphDSL]] represents our Pipelining and Parallelism process.
    *
    * - Receives message from the [[Source]]
    * - Process it in parallel
    * - Send it downstream to [[Sink]]
    */
    val graph: Flow[(Message, Raw), Either[Raw, Raw], NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val dispatch = builder.add(Balance[(Message, Raw)](parallelism))
    val merge = builder.add(scaladsl.Merge[Either[Raw, Raw]](parallelism))

    val saveIntoDb: Flow[(Message, Raw), (Message, Either[Raw, Raw]), NotUsed] = Flow[(Message, Raw)].mapAsyncUnordered(1) { tuple =>
    val (msg, raw) = tuple
    RawRepository.save(raw).map(either => (msg, either))
    }

    val deleteFromSqs: Flow[(Message, Either[Raw, Raw]), Either[Raw, Raw], NotUsed] = Flow[(Message, Either[Raw, Raw])].mapAsyncUnordered(1) { tuple =>
    val (msg, either) = tuple
    sqs.deleteMessage(url, msg.getReceiptHandle).map(_ => either)
    }

    for (i <- 0 until parallelism) {
    dispatch.out(i) ~> saveIntoDb.async ~> deleteFromSqs.async ~> merge.in(i)
    }

    FlowShape(dispatch.in, merge.out)
    })

    graph.runWith(source, sink)

    }