Created
November 9, 2015 00:38
-
-
Save jatinganhotra/0ba2b98c2c8cf3a4255b to your computer and use it in GitHub Desktop.
SparkListener - Checkpointing jobs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConversions._ // for propertiesAsScalaMap function | |
sc.addSparkListener(new SparkListener() { | |
override def onJobStart(jobStart: SparkListenerJobStart) { | |
println("ADAPT: INSIDE Job Start Listener "); | |
var props = propertiesAsScalaMap(jobStart.properties) | |
if (props.contains("spark.rdd.scope")) | |
{ | |
val propsMap = // Convert props to propsMap | |
if ( propsMap.contains("name") && propsMap("name") == "checkpoint") | |
{ | |
println("JobID " + jobStart.jobId); | |
println("This is a checkpointing job for RDD - " + propsMap("id")) | |
println("StartTime - " + jobStart.time) | |
} | |
} | |
} | |
override def onJobEnd(jobEnd: SparkListenerJobEnd) { | |
println("ADAPT: Inside Job end Listener "); | |
println("JobID " + jobEnd.jobId); | |
println("EndTime " + jobEnd.time); | |
} | |
// There are listeners for other events too, but not required for now. Below are some examples | |
/* | |
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { | |
println("ADAPT: Spark ApplicationStart: " + applicationStart.appName + " at time - " + applicationStart.time); | |
} | |
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { | |
println("ADAPT: Spark ApplicationEnd: " + applicationEnd.time); | |
} | |
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { | |
println("ADAPT: Inside Stage Completed Listener "); | |
val stageInfo = stageCompleted.stageInfo; | |
val rddInfos = stageInfo.rddInfos; | |
println(" The rddInfos for this stage are - "); | |
rddInfos.foreach {println} | |
for (rddInfo <- rddInfos) println(rddInfo.id) | |
rddInfos.foreach(row => { | |
println("ADAPT: Listener - rdd memSize " + row.memSize) | |
println("ADAPT: Listener - rdd diskSize " + row.diskSize) | |
}) | |
println("\n"); | |
} | |
*/ | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment