Skip to content

Instantly share code, notes, and snippets.

@yanakad
Last active August 29, 2015 14:21
Show Gist options
  • Save yanakad/7faea20ca980e8085234 to your computer and use it in GitHub Desktop.
Save yanakad/7faea20ca980e8085234 to your computer and use it in GitHub Desktop.
Register table from different threads
registered table thread_459 <--Register table gets called
dropped table thread_259
dropped table thread_759
registered table thread_860
dropped table thread_860
registered table thread_560
registered table thread_060
registered table thread_960
registered table thread_160
dropped table thread_560
dropped table thread_960
dropped table thread_160
registered table thread_660
registered table thread_360
dropped table thread_360
dropped table thread_660
registered table thread_260
registered table thread_760
dropped table thread_760
registered table thread_861
dropped table thread_861
registered table thread_061
registered table thread_561
registered table thread_961
registered table thread_161
dropped table thread_961
dropped table thread_061
dropped table thread_561
dropped table thread_161
registered table thread_361
registered table thread_661
dropped table thread_361
registered table thread_261
registered table thread_761
dropped table thread_261
dropped table thread_761
registered table thread_862
registered table thread_962
registered table thread_062
registered table thread_562
registered table thread_162
dropped table thread_962
dropped table thread_162
dropped table thread_562
registered table thread_362
registered table thread_662
dropped table thread_362
dropped table thread_662
registered table thread_262
registered table thread_762
dropped table thread_762
registered table thread_863
dropped table thread_863
*Exception **
Exception in thread "pool-6-thread-5" java.lang.Error: org.apache.spark.sql.AnalysisException: no such table thread_459; line 1 pos 21
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.AnalysisException: no such table thread_459; line 1 pos 21
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
at com.akamai.rum.test.unit.QueryJob.run(ThreadRepro.java:79)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more
registered table thread_963
import com.google.common.base.Joiner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.junit.Test;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
/**
* Created by ykadiysk on 5/15/15.
*/
public class ThreadRepro {
public static void main(String[] args) throws Exception{
new ThreadRepro().sparkPerfTest();
}
public void sparkPerfTest(){
final AtomicLong counter = new AtomicLong();
SparkConf conf = new SparkConf();
conf.setAppName("My Application");
conf.setMaster("local[7]");
SparkContext sc = new SparkContext(conf);
org.apache.spark.sql.hive.HiveContext hc = new org.apache.spark.sql.hive.HiveContext(sc);
int poolSize = 10;
ExecutorService pool = Executors.newFixedThreadPool(poolSize);
String query = "select count(*) from table where year='2015'";
for (int i=0; i<poolSize;i++ )
pool.execute(new QueryJob(hc, query, i,counter));
pool.shutdown();
try {
pool.awaitTermination(60, TimeUnit.MINUTES);
}catch(Exception e){
System.out.println("Thread interrupted");
}
System.out.println("All jobs complete");
System.out.println(" Counter is "+counter.get());
}
}
class QueryJob implements Runnable{
String query;
String threadId;
org.apache.spark.sql.hive.HiveContext sqlContext;
String key;
AtomicLong counter;
final AtomicLong local_counter = new AtomicLong();
public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,String query,int id,AtomicLong ctr){
this.query =query;
threadId = "thread_"+id;
this.sqlContext= _sqlContext;
this.counter = ctr;
}
public void run() {
String parquetPath = "../../large_partitioned/year=2015";
for (int i = 0; i < 100; i++) {
String tblName = threadId +i;
DataFrame df = sqlContext.parquetFile(parquetPath);
df.registerTempTable(tblName);
String _query = query.replace("table", tblName);
System.out.println(" registered table "+tblName);
List<Row> res;
try {
res = sqlContext.sql(_query).collectAsList();
}catch (Exception e){
System.out.println("*Exception "+ Joiner.on(',').join(sqlContext.tableNames())+"**");
throw e;
}
long cnt = res.get(0).getLong(0);
sqlContext.dropTempTable(tblName);
System.out.println(" dropped table "+tblName);
counter.addAndGet(cnt);
local_counter.addAndGet(cnt);
try {
Thread.sleep(3000);//lets make this a not-so-tight loop
}catch(Exception e){
System.out.println("Thread interrupted");
}
}
System.out.println(" Thread "+threadId+" read "+local_counter);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment