Created
March 1, 2019 14:37
-
-
Save HeartSaVioR/74c7e78e5901b1974ccc400502fb6af2 to your computer and use it in GitHub Desktop.
Performance test code on Spark Kafka Consumer Pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// run the code in spark-shell | |
// e.g.: ./bin/spark-shell --master "local[3]" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-SNAPSHOT | |
val branch = "master" // change this when changing version of "spark-sql-kafka" | |
val attempt = "1" // change this according to the attempt No. | |
// :paste | |
import java.io.{File, PrintWriter} | |
import org.apache.commons.logging.LogFactory | |
import org.apache.spark.sql.functions.{from_json, struct, to_json} | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.streaming.StreamingQueryListener | |
import spark.implicits._ | |
class QueryListenerWriteProgressToFile(queryStatusFile: String) extends StreamingQueryListener { | |
val logger = LogFactory.getLog(classOf[QueryListenerWriteProgressToFile].getName) | |
val writer = new PrintWriter(new File(queryStatusFile)) | |
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { | |
logger.info(s"Query is started for ID ${event.id} and RUNID ${event.runId}") | |
} | |
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { | |
try { | |
writer.write(event.progress.json + "\n") | |
writer.flush() | |
} catch { | |
case e: Exception => | |
logger.error("Error write event[" + event.progress.json + "] to file", e) | |
} | |
} | |
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { | |
logger.info(s"Query is terminated for ID ${event.id} and RUNID ${event.runId}") | |
writer.close() | |
} | |
} | |
val queryWritePath = s"/Users/jlim/experiment-SPARK-25151-${branch}-query-v${attempt}.log" | |
spark.streams.addListener(new QueryListenerWriteProgressToFile(queryWritePath)) | |
val speedEventDf = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "truck_speed_events_stream") | |
.option("startingOffsets", "earliest") | |
.option("failOnDataLoss", "true") | |
.option("maxOffsetsPerTrigger", 5000) | |
.load() | |
val speedSchema = StructType(Seq( | |
StructField("eventTime", StringType, nullable = false), | |
StructField("eventSource", StringType, nullable = false), | |
StructField("truckId", IntegerType, nullable = false), | |
StructField("driverId", IntegerType, nullable = false), | |
StructField("driverName", StringType, nullable = false), | |
StructField("routeId", IntegerType, nullable = false), | |
StructField("route", StringType, nullable = false), | |
StructField("speed", IntegerType, nullable = false) | |
)) | |
val query = speedEventDf | |
.selectExpr("CAST(value AS STRING) as value") | |
.as[String] | |
.select(from_json($"value", schema = speedSchema).as("data")) | |
.selectExpr("data.*", "to_timestamp(data.eventTime, 'yyyy-MM-dd HH:mm:ss.SSS') AS eventTimestamp") | |
.select(to_json(struct($"*")).as("value")) | |
.writeStream | |
.foreachBatch { | |
(batchDF: DataFrame, batchId: Long) => // no-op | |
} | |
.start() | |
query.awaitTermination() | |
// command to read query status and have simple statistic | |
// brew install jq && brew install datamash | |
// cat experiment-SPARK-25151-master-query-v1.log | grep "addBatch" | jq '. | {addBatch: .durationMs.addBatch}' | grep "addBatch" | awk -F " " '{print $2}' | datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment