Created
September 2, 2014 20:42
-
-
Save sujee/f21828d6762b0bbb1c61 to your computer and use it in GitHub Desktop.
named RDD
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
override def runJob(sc: SparkContext, config: Config): Any = { | |
val fileName = config.getString("input.file") | |
logger.info("### fileName : " + fileName) | |
var rdd = this.namedRdds.get[String](fileName) | |
logger.info("### rdd load 1 : " + rdd) | |
if (rdd.isDefined) { | |
logger.info("### rdd %s isDefined".format(fileName)) | |
} | |
else { | |
logger.info("### rdd %s doesn't exist... loading".format(fileName)) | |
val newRDD = sc.textFile(fileName) | |
newRDD.cache() | |
newRDD.persist() | |
this.namedRdds.update(fileName, newRDD) | |
logger.info("### rdd %s updated".format(fileName)) | |
} | |
rdd = this.namedRdds.get[String](fileName) | |
logger.info("### rdd load 2 : " + rdd) | |
val count = rdd.count() | |
logger.info ("### count : " + count) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Sujee,
I am running the same code, but it's throwing me the error of
Exception in thread "pool-1-thread-1" java.lang.NoSuchMethodError: spark.jobserver.NamedRdds.update(Ljava/lang/String;Lscala/Function0;)Lorg/apache/spark/rdd/RDD
Below is my code:
/**
*/
package com.dhruv
import org.apache.spark._
import com.typesafe.config.{Config,ConfigFactory}
import org.apache.spark.SparkContext._
import spark.jobserver.{NamedRdds, NamedRddSupport, SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation}
import scala.util.Try
object saveRDDJob extends SparkJob with NamedRddSupport{
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "WordCountExample")
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def runJob(sc:SparkContext, config: Config): Any ={
val dd = sc.parallelize(1 to 100)
dd.cache()
dd.persist()
this.namedRdds.update("savedrdd", dd)
dd.collect()
}
override def validate(sc:SparkContext, config:Config): SparkJobValidation={
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
}
object retrieveRDDJob extends SparkJob with NamedRddSupport{
def main(args:Array[String]): Unit ={
val sc = new SparkContext("local[4]", "WordCountExample")
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is "+ results)
}
override def runJob(sc:SparkContext, config:Config): Any={
val dd = this.namedRdds.get("sampleRDD").get
}
override def validate(sc:SparkContext, config:Config): SparkJobValidation={
Try(config.getString("input.string"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No input.string config param"))
}
}
SBT:
name := "MergingDatasets"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"
resolvers += "Job Server Bintray" at "https://dl.bintray.com/spark-jobserver/maven"
libraryDependencies += "spark.jobserver" %% "job-server-api" % "0.6.0" % "provided"
libraryDependencies += "spark.jobserver" %% "job-server-extras" % "0.6.0" % "provided"
val buildSettings = Defaults.defaultSettings ++ Seq(
javaOptions += "-Xmx8G"
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}