Last active
March 13, 2019 09:05
-
-
Save HeartSaVioR/d831974c3f25c02846f4b15b8d232cc2 to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition) - v2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{File, PrintWriter} | |
import org.apache.spark.internal.Logging | |
import org.apache.spark.sql.functions.{from_json, struct, to_json} | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.streaming.StreamingQueryListener | |
import spark.implicits._ | |
class QueryListenerWriteProgressToStdout() extends StreamingQueryListener with Logging { | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { | |
logWarning(s"Query is started for ID ${event.id} and RUNID ${event.runId}") | |
} | |
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | |
try { | |
logWarning(event.progress.json + "\n") | |
} catch { | |
case e: Exception => | |
logError("Error write event[" + event.progress.json + "] to file", e) | |
} | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { | |
logWarning(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}") | |
} | |
} | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
spark.streams.addListener(new QueryListenerWriteProgressToStdout()) | |
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "5") | |
val speedSchema = StructType(Seq( | |
StructField("eventTime", StringType, nullable = false), | |
StructField("eventSource", StringType, nullable = false), | |
StructField("truckId", IntegerType, nullable = false), | |
StructField("driverId", IntegerType, nullable = false), | |
StructField("driverName", StringType, nullable = false), | |
StructField("routeId", IntegerType, nullable = false), | |
StructField("route", StringType, nullable = false), | |
StructField("speed", IntegerType, nullable = false) | |
)) | |
// data distribution is skewed so had to set 0 to some partitions | |
// only uses 4 smallest partitions to let query finish earlier | |
// one fetch request seems to get 500 records per partition: setting this to 200 per partition | |
// (so that it can be continuously re-read from next batch) | |
val speedEventDf = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "truck_speed_events_stream_spark_25151_v1") | |
.option("startingOffsets", """{"truck_speed_events_stream_spark_25151_v1":{"0":5,"1":5,"2":-1,"3":-1,"4":5,"5":-1,"6":5,"7":-1,"8":-1,"9":0}}""") | |
.option("failOnDataLoss", "true") | |
.option("maxOffsetsPerTrigger", 800) // 200 * 4 partitions | |
.load() | |
.selectExpr("CAST(value AS STRING) as value") | |
.as[String] | |
.select(from_json($"value", schema = speedSchema).as("data")) | |
.selectExpr("data.eventTime AS eventTime", "data.eventSource AS eventSource", "data.truckId AS truckId", "data.driverId AS driverId", "data.driverName AS driverName", "data.routeId AS routeId", "data.route AS route", "data.speed AS speed") | |
val speedEventDf2 = speedEventDf | |
.selectExpr("eventTime AS eventTime2", "eventSource AS eventSource2", "truckId AS truckId2", "driverId AS driverId2", "driverName AS driverName2", "routeId AS routeId2", "route AS route2", "speed AS speed2") | |
val query = speedEventDf | |
.join(speedEventDf2, expr("truckId = truckId2 AND driverId = driverId2 AND routeId = routeId2")) | |
.select(to_json(struct($"*")).as("value")) | |
.writeStream | |
.foreachBatch { | |
(batchDF: DataFrame, batchId: Long) => println(s"batchId $batchId received for writing, content: ${batchDF.collect()}") | |
} | |
.start() | |
query.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment