Skip to content

Instantly share code, notes, and snippets.

@huasanyelao
Created April 15, 2015 03:31
Show Gist options
  • Save huasanyelao/c0e97b752eb973046e9a to your computer and use it in GitHub Desktop.
Save huasanyelao/c0e97b752eb973046e9a to your computer and use it in GitHub Desktop.
package com.github.juanrh.spark_kafka
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
object KafkaDirectGetOffsetPreciseExample extends App {
// Configuration
val topic = "test"
val seedKafkaBroker = "localhost"
// Create Spark Streaming context
val master = "local[3]"
val batchDuration = Seconds(1)
val conf = new SparkConf().setMaster(master).setAppName("KafkaDirectExample")
val ssc : StreamingContext = new StreamingContext(conf, batchDuration)
// Connect to a Kafka topic for reading
val kafkaParams : Map[String, String] = Map("metadata.broker.list" -> (seedKafkaBroker + ":9092"))
val kafkaStreamWithIndividualOffsets = {
val messageHandler = (mmd: MessageAndMetadata[String, String])
=> KeyValOffset(mmd.key, mmd.message, mmd.partition, mmd.offset, mmd.offset + 1)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, KeyValOffset[String, String]](
ssc, kafkaParams, Set(topic), messageHandler
)
}
// check the connection
kafkaStreamWithIndividualOffsets.print
ssc.start
ssc.awaitTermination
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment