Created
January 17, 2017 06:15
-
-
Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.
Spark Streaming with Kafka, Solr and Indexed RDD.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "kafkatest" | |
version := "1.0" | |
scalaVersion := "2.11.8" | |
libraryDependencies ++= Seq( | |
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided", | |
"org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided", | |
"org.apache.spark" %% "spark-sql" % "2.0.1", | |
"org.apache.spark" %% "spark-mllib" % "2.0.1", | |
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1", | |
"org.apache.spark" %% "spark-mllib" % "2.0.1" | |
) | |
// META-INF discarding | |
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => | |
{ | |
case PathList("META-INF", xs @ _*) => MergeStrategy.discard | |
case x => MergeStrategy.first | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.streaming.kafka._ | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD | |
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._ | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
/** KafkaIndexed - Find the bigrams from Log data coming through Kafka broker */ | |
object KafkaIndexed { | |
def main(args: Array[String]): Unit = { | |
// Suppress some of the log messages for seeing test results easily | |
Logger.getLogger("org").setLevel(Level.ERROR) | |
Logger.getLogger("akka").setLevel(Level.ERROR) | |
// Setting up the spark streaming context | |
val sparkConf = new SparkConf().setAppName("KafkaIndexed").setMaster("local[*]") | |
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") | |
val sc = new SparkContext(sparkConf) | |
val sqlContext = new SQLContext(sc) | |
import sqlContext.implicits._ | |
val ssc = new StreamingContext(sc, Seconds(1)) | |
// Read the solr data and cache it as Indexed RDD (https://github.com/amplab/spark-indexedrdd) | |
val options = Map("zkHost" -> "localhost:2181","collection" -> "biwords","fields" -> "biwords,id") | |
val rdd_biwords = sqlContext.read.format("solr").options(options).load().rdd.map(r => (r.getString(0),r.getString(1))) | |
val indexed = IndexedRDD(rdd_biwords).cache() | |
// Set up Kafka streaming receiver | |
val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing | |
val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2) | |
// Looping through the data received from the input stream | |
lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) { | |
val nrdd = rdd.map{ | |
// Split each line into substrings by periods | |
_.split('.').map{ substrings => | |
// Trim substrings and then tokenize on spaces | |
substrings.trim.split(' '). | |
// Remove non-alphanumeric characters and convert to lowercase | |
map{_.replaceAll("""\W""", "").toLowerCase()}. | |
// Find bigrams | |
sliding(2) | |
}. | |
// Flatten, and map the bigrams to concatenated strings | |
flatMap{identity}.map{_.mkString(" ")}. | |
// Group the bigrams and count their frequency | |
groupBy{identity}.mapValues{_.size} | |
}. | |
// Reduce to get a global count, then collect | |
flatMap{identity}.reduceByKey(_+_) | |
nrdd.join(indexed).map{ case (a,(b,c)) => (a,b)}.foreach(println) | |
}) | |
// Start the streaming context | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} | |
/** | |
To Run a test: | |
============== | |
1 - Download and install dependencies: | |
a) Download the following jar | |
https://github.com/ankurdave/maven-repo/blob/master/com/ankurdave/part_2.10/0.2/part_2.10-0.2.jar | |
b) Download the IndexedRdd jar | |
http://dl.bintray.com/spark-packages/maven/amplab/spark-indexedrdd/0.4.0/spark-indexedrdd-0.4.0.jar | |
c) Download and build spark solr to create the jar (due to dependency issues) | |
https://github.com/lucidworks/spark-solr | |
2 - Build the program using the build.sbt provided | |
sbt package | |
3 - In the shell, run: | |
spark-submit --class KafkaIndexed \ | |
--jars library/spark-solr-3.0.1-SNAPSHOT-shaded.jar, \ | |
library/amplab_spark-indexedrdd-0.4.0.jar, \ | |
library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar, \ | |
library/part_2.10-0.2.jar \ | |
target/scala-2.11/kafkatest_2.11-1.0.jar | |
4 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created) | |
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=172.17.7.7:32771,172.17.7.7:32772 | |
5 - In the Kafka producer prompt, type text like below: | |
hello there how are you doing today | |
write something that does not match any biwords in the solr db | |
6 - View the outputs in the first shell where the Spark streaming application is running | |
Assumptions: | |
============ | |
1 - Solr cloud is setup and running connected to Zookeeper at port 2181 | |
2 - Solr has a collection called 'biwords' with 2 fields (id and biwords). | |
3 - In the Solr collection, 'Multivalued' should be set to false for the biwords field. | |
4 - Kafka nodes are running connected to another Zookeeper at port 2182 | |
5 - Kafka topic 'test' is created | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment