Last active
March 1, 2019 23:16
-
-
Save HeartSaVioR/bf14fa6040b0f85e6ff1ee613aa7976a to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool (concurrent access on topic partition)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{File, PrintWriter} | |
import org.apache.commons.logging.LogFactory | |
import org.apache.spark.sql.functions.{from_json, struct, to_json} | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.streaming.StreamingQueryListener | |
import spark.implicits._ | |
class QueryListenerWriteProgressToFile(queryStatusFile: String) extends StreamingQueryListener { | |
val logger = LogFactory.getLog(classOf[QueryListenerWriteProgressToFile].getName) | |
val writer = new PrintWriter(new File(queryStatusFile)) | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { | |
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}") | |
} | |
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | |
try { | |
writer.write(event.progress.json + "\n") | |
writer.flush() | |
} catch { | |
case e: Exception => | |
logger.error("Error write event[" + event.progress.json + "] to file", e) | |
} | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { | |
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}") | |
writer.close() | |
} | |
} | |
val queryWritePath = s"/Users/jlim/experiment-SPARK-25151/experiment-SPARK-25151-${branch}-query-selfjoin-concurrent-access-v${attempt}.log" | |
spark.streams.addListener(new QueryListenerWriteProgressToFile(queryWritePath)) | |
val speedSchema = StructType(Seq( | |
StructField("eventTime", StringType, nullable = false), | |
StructField("eventSource", StringType, nullable = false), | |
StructField("truckId", IntegerType, nullable = false), | |
StructField("driverId", IntegerType, nullable = false), | |
StructField("driverName", StringType, nullable = false), | |
StructField("routeId", IntegerType, nullable = false), | |
StructField("route", StringType, nullable = false), | |
StructField("speed", IntegerType, nullable = false) | |
)) | |
// data distribution is skewed so had to set 0 to some partitions | |
val speedEventDf = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "truck_speed_events_stream") | |
.option("startingOffsets", """{"truck_speed_events_stream":{"0":5,"1":5,"2":5,"3":5,"4":5,"5":5,"6":5,"7":5,"8":5,"9":0,"10":0,"11":0,"12":5,"13":5,"14":0,"15":5,"16":0,"17":5,"18":5,"19":0}}""") | |
.option("failOnDataLoss", "true") | |
.option("maxOffsetsPerTrigger", 5000) | |
.load() | |
.selectExpr("CAST(value AS STRING) as value") | |
.as[String] | |
.select(from_json($"value", schema = speedSchema).as("data")) | |
.selectExpr("data.eventTime AS eventTime", "data.eventSource AS eventSource", "data.truckId AS truckId", "data.driverId AS driverId", "data.driverName AS driverName", "data.routeId AS routeId", "data.route AS route", "data.speed AS speed") | |
val speedEventDf2 = speedEventDf | |
.selectExpr("eventTime AS eventTime2", "eventSource AS eventSource2", "truckId AS truckId2", "driverId AS driverId2", "driverName AS driverName2", "routeId AS routeId2", "route AS route2", "speed AS speed2") | |
val query = speedEventDf | |
.join(speedEventDf2, expr("truckId = truckId2 AND driverId = driverId2 AND routeId = routeId2")) | |
.select(to_json(struct($"*")).as("value")) | |
.writeStream | |
.foreachBatch { | |
(batchDF: DataFrame, batchId: Long) => // no-op | |
} | |
.start() | |
query.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment