Created
January 27, 2016 17:39
-
-
Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _root_.kafka.serializer.{StringDecoder, StringEncoder} | |
import akka.actor.SupervisorStrategy.{Restart, Resume} | |
import akka.actor.{Props, OneForOneStrategy, SupervisorStrategy, ActorSystem} | |
import akka.stream.actor.ActorSubscriber | |
import akka.stream.{ActorMaterializerSettings, Supervision, ActorMaterializer} | |
import akka.stream.scaladsl.{Sink, Source} | |
import com.softwaremill.react.kafka.KafkaMessages.StringKafkaMessage | |
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka} | |
import org.reactivestreams.{Publisher, Subscriber} | |
import scala.language.postfixOps | |
object Uppercase extends App { | |
implicit val actorSystem = ActorSystem("ReactiveKafka") | |
val decider: Supervision.Decider = { | |
case e: Throwable => | |
println("Stream Supervision Decider to the rescue!!! (case Throwable)") | |
Supervision.Restart | |
case _ => | |
println("Stream Supervision Decider to the rescue!!! (case _)") | |
Supervision.Restart | |
} | |
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) | |
val kafka = new ReactiveKafka() | |
val publisher: Publisher[StringKafkaMessage] = kafka.consume(ConsumerProperties( | |
brokerList = "192.168.42.45:9092", | |
zooKeeperHost = "192.168.42.45:2181/kafka", | |
topic = "lowercaseStrings", | |
groupId = "groupName", | |
decoder = new StringDecoder() | |
)) | |
// val subscriber: Subscriber[String] = kafka.publish(ProducerProperties( | |
// brokerList = "192.168.42.45:9092", | |
// topic = "uppercaseStrings", | |
// encoder = new StringEncoder() | |
// )) | |
val subscriber: Subscriber[String] = ??? | |
Source.fromPublisher(publisher).map{ m => | |
Thread.sleep(50) | |
if (m.message() == "b") throw new IllegalArgumentException("b encountered. We can't have that!") | |
println(m.message()) | |
m.message().toUpperCase | |
}.to(Sink.fromSubscriber(subscriber)).run() | |
} | |
import akka.actor.{Actor, ActorRef, ActorSystem, Props} | |
import akka.stream.ActorMaterializer | |
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka} | |
class Handler extends Actor { | |
implicit val materializer = ActorMaterializer() | |
override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { | |
case exception => | |
println("Actor Supervision Strategy to the rescue!!! (case exception)") | |
Restart // Your custom error handling | |
} | |
def createSupervisedSubscriberActor() = { | |
val kafka = new ReactiveKafka() | |
// subscriber | |
val subscriberProperties = ProducerProperties( | |
brokerList = "192.168.42.45:9092", | |
topic = "uppercaseStrings", | |
encoder = new StringEncoder() | |
) | |
val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties) | |
context.actorOf(subscriberActorProps) | |
} | |
override def receive: Receive = { | |
case actor => println("Stuff") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment