Created
January 13, 2017 06:01
-
-
Save hardkap/60152ebb8514e7d3c9995587b523fedb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "kafkatest" | |
version := "1.0" | |
scalaVersion := "2.11.8" | |
libraryDependencies ++= Seq( | |
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided", | |
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided", | |
"org.apache.spark" %% "spark-sql" % "2.0.1", | |
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.streaming.kafka._ | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
/** KafkaThree - Spark Streaming App to find the bigrams from Log data coming through Kafka broker */ | |
object KafkaThree { | |
def main(args: Array[String]): Unit = { | |
// Suppress some of the log messages for seeing test results easily | |
Logger.getLogger("org").setLevel(Level.ERROR) | |
Logger.getLogger("akka").setLevel(Level.ERROR) | |
// Setting up the spark streaming context | |
val sparkConf = new SparkConf().setAppName("KafkaThree") | |
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") | |
val sc = new SparkContext(sparkConf) | |
val ssc = new StreamingContext(sc, Seconds(1)) | |
// Broadcasting the solr data | |
val bcSolr = sc.broadcast(Array("hello there", "there goes", "howdy buddy")) | |
// Setting up Kafka streaming receiver | |
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing | |
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2) | |
// Looping through the data received from the input stream | |
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) { | |
val nrdd = rdd.map{ | |
// Split each line into substrings by periods | |
_.split('.').map{ substrings => | |
// Trim substrings and then tokenize on spaces | |
substrings.trim.split(' '). | |
// Remove non-alphanumeric characters and convert to lowercase | |
map{_.replaceAll("""\W""", "").toLowerCase()}. | |
// Find bigrams | |
sliding(2) | |
}. | |
// Flatten, and map the bigrams to concatenated strings | |
flatMap{identity}.map{_.mkString(" ")}. | |
// Group the bigrams and count their frequency | |
groupBy{identity}.mapValues{_.size} | |
}. | |
// Reduce to get a global count, then collect | |
flatMap{identity}.reduceByKey(_+_) | |
// Filter by what matches the Solr data and print the rest | |
nrdd.filter(x => !(bcSolr.value contains x._1)).foreach(println) | |
}) | |
// Start the streaming context | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} | |
/** | |
To Run a test: | |
============== | |
1 - First compile the program using (build.sbt is also given in this gist) | |
sbt package | |
2 - In the shell of the current folder, run: | |
spark-submit --class KafkaThree --jars library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar target/scala-2.11/kafkatest_2.11-1.0.jar | |
3 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created) | |
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=localhost:32770,localhost:32771 | |
4 - In the Kafka producer prompt, type text like below: | |
hello there goes howdy buddy | |
hello there goes howdy buddy goes howdy | |
5 - View the outputs in the first shell where the Spark streaming application is running | |
Notes: | |
====== | |
The original requirement was to take bcSolr data by connecting to Solr. | |
But, this program is using a plain broadcast variable instead of taking the data from Solr. | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment