Created
November 15, 2016 10:06
-
-
Save muller/0fd39738b940712e65295d8a6ed8775a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package consumer | |
import akka.actor._ | |
import akka.kafka.ConsumerMessage._ | |
import akka.kafka._ | |
import akka.kafka.scaladsl._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.testkit._ | |
import net.manub.embeddedkafka._ | |
import org.apache.kafka.common.serialization._ | |
import org.scalatest._ | |
import scala.concurrent.duration._ | |
class ZipperSpec extends TestKit(ActorSystem()) with FlatSpecLike with Matchers with EmbeddedKafka with BeforeAndAfterAll { | |
implicit val materializer = ActorMaterializer() | |
val Orion = | |
"Betelgeuse Rigel Bellatrix Mintaka Alnilam Alnitak Saiph".split(' ') | |
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) | |
.withBootstrapServers("localhost:6001") | |
.withGroupId("test") | |
.withProperty(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |
override def beforeAll = EmbeddedKafka.start() | |
override def afterAll = EmbeddedKafka.stop() | |
it should "read and commit" in { | |
Orion.foreach { star => | |
publishStringMessageToKafka("orion", star) | |
} | |
val fut = Consumer | |
.committableSource(consumerSettings, Subscriptions.topics("orion")) | |
.via(committable(flow)) | |
.runWith(Sink.actorRef(testActor, None)) | |
receiveN(7) should be(Seq("BETELGEUSE", "RIGEL", "BELLATRIX", "MINTAKA", "ALNILAM", "ALNITAK", "SAIPH")) | |
expectNoMsg() | |
} | |
def flow = Flow[String].map { | |
_.toUpperCase | |
} | |
def committable[In, Out](flow: Flow[In, Out, _]) = Flow.fromGraph(GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val split = Flow[CommittableMessage[_, In]] | |
.map(it => it -> it.record.value) | |
val commit = Flow[(CommittableMessage[_, In], Out)] | |
.mapAsync(Int.MaxValue) { | |
case (message, out) => | |
message.committableOffset.commitScaladsl().map(done => out) | |
} | |
val in = b.add(split) | |
val unzip = b.add(Unzip[CommittableMessage[_, In], In]) | |
val zip = b.add(Zip[CommittableMessage[_, In], Out]) | |
val done = b.add(commit) | |
// @formatter:off | |
in.out ~> unzip.in | |
unzip.out0 ~> zip.in0 | |
unzip.out1 ~> flow ~> zip.in1 | |
zip.out ~> done.in | |
// @formatter:on | |
FlowShape(in.in, done.out) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment