Skip to content

Instantly share code, notes, and snippets.

@iamthiago
Last active January 29, 2018 18:24
Show Gist options
  • Save iamthiago/18d0926a1e44cf2414720a689fef1a54 to your computer and use it in GitHub Desktop.
Save iamthiago/18d0926a1e44cf2414720a689fef1a54 to your computer and use it in GitHub Desktop.
import java.util.Properties
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import com.typesafe.config.{Config, ConfigFactory}
import KafkaProducerActor.KafkaMessage
import kafka.common.FailedToSendMessageException
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.concurrent.duration._
class KafkaProducerActor extends Actor with ActorLogging {
implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
val config: Config = ConfigFactory.load()
val props = new Properties()
props.put("metadata.broker.list", config.getString("kafka.host"))
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", config.getString("kafka.ack"))
props.put("producer.type", config.getString("kafka.producer.type"))
props.put("compression.codec", config.getString("kafka.codec"))
private val producer = new KafkaProducer[String, String](props)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.info(s"Restarting Actor due: ${reason.getCause}")
message foreach { msg =>
context.system.scheduler.scheduleOnce(1.minute, self, msg)
}
}
def receive = LoggingReceive {
case KafkaMessage(topic, optId: Option[String], msg) =>
val keyedMessage = optId match {
case Some(id) => new ProducerRecord[String, String](topic, id, msg)
case None => new ProducerRecord[String, String](topic, msg)
}
try {
producer.send(keyedMessage)
} catch {
case f: FailedToSendMessageException => throw new FailedToSendMessageException(f.getMessage, f)
case e: Exception =>
log.error("Something bad happened", e)
throw e
}
case msg => log.warning("Unknown message" + msg.toString)
}
}
object KafkaProducerActor {
val name = "kafka-producer-actor"
val props: Props = Props[KafkaProducerActor]
case class KafkaMessage(topic: String, id: Option[String], msg: String)
}
import akka.actor.SupervisorStrategy.{Escalate, Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorLogging, OneForOneStrategy, Terminated}
import akka.event.LoggingReceive
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
import com.vivareal.mrmeeseeks.stream.KafkaProducerActor.KafkaMessage
import kafka.common.{FailedToSendMessageException, NoBrokersForPartitionException}
import org.apache.kafka.common.KafkaException
class KafkaSupervisor extends Actor with ActorLogging {
override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy(maxNrOfRetries = 3) {
case _: ActorInitializationException => Stop
case _: FailedToSendMessageException => Restart
case _: NoBrokersForPartitionException => Escalate
case _: KafkaException => Escalate
case _: _ => Restart
}
var router: Router = {
val routees = Vector.fill(5) {
val r = context.actorOf(KafkaProducerActor.props)
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}
def receive = LoggingReceive {
case k: KafkaMessage =>
router.route(k, sender())
case Terminated(a) =>
router = router.removeRoutee(a)
val r = context.actorOf(KafkaProducerActor.props)
context watch r
router = router.addRoutee(r)
}
}
object KafkaSupervisor {
val name = "kafka-supervisor"
val props: Props = Props[KafkaSupervisor]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment