Skip to content

Instantly share code, notes, and snippets.

@yanakad
Last active August 29, 2015 14:19
Show Gist options
  • Save yanakad/c9d8a613fe39a8da4a44 to your computer and use it in GitHub Desktop.
Save yanakad/c9d8a613fe39a8da4a44 to your computer and use it in GitHub Desktop.
Spark parallelism
import java.util.concurrent.{TimeUnit, ConcurrentLinkedQueue, Executors, ExecutorService}
import java.util.Date
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql._
import org.apache.spark.{HashPartitioner, Logging, SparkConf, SparkContext}
import scala.compat.Platform
import org.apache.log4j.{Logger,Level}
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val poolSize = 10
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
val query = "" //this is not used, the job will create a unique query based on its ID
val rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
for (i <- 1 to poolSize )
pool.execute(new ReportJob(sqlContext, query, i))
import sqlContext.createSchemaRDD
case class KeyValue(key: Int, value: String)
val large = sc.
parallelize(1 to 16384).
flatMap(i => Seq.fill(16384)(KeyValue(i, i.toString)))
large.registerTempTable("large")
class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging {
def threadId = (Thread.currentThread.getName() + "\t")
def run() {
logWarning(s"********************* Running ${threadId} ${id}")
val startTime = Platform.currentTime
val query = s"select count(*) from large where key<$id"
val result_set = sqlContext.sql(query)
val cnt = result_set.first.getLong(0)
logWarning(s"********************* DONE ${threadId} ${id} time: "+(Platform.currentTime-startTime))
logWarning(s" Result $cnt")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment