Created
June 3, 2015 07:50
-
-
Save koen-dejonghe/39c10357607c698c0b04 to your computer and use it in GitHub Desktop.
Implementation of a connection pool for use with spark streaming. See http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.atos.sparti.pub | |
import java.io.PrintStream | |
import java.net.Socket | |
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} | |
import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory} | |
import org.apache.spark.streaming.dstream.DStream | |
class PooledSocketStreamPublisher[T](host: String, port: Int) | |
extends Serializable { | |
/** | |
* Publish the stream to a socket. | |
*/ | |
def publish (stream: DStream[T], callback: (T) => String) = | |
stream foreachRDD ( rdd => | |
rdd foreachPartition { partition => | |
val pool = PrintStreamPool(host, port) | |
partition foreach { event => | |
val s = pool.printStream | |
s println callback (event) | |
} | |
pool.release() | |
} | |
) | |
} | |
class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) { | |
def release() = pool.returnObject(printStream) | |
} | |
object PrintStreamPool { | |
var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map() | |
sys.addShutdownHook { | |
hostPortPool.values.foreach { pool => pool.close() } | |
} | |
// factory method | |
def apply(host: String, port: Int): ManagedPrintStream = { | |
val pool = hostPortPool.getOrElse((host, port), { | |
val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port)) | |
hostPortPool += (host, port) -> p | |
p | |
}) | |
new ManagedPrintStream(pool, pool.borrowObject()) | |
} | |
} | |
class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] { | |
override def create() = new PrintStream(new Socket(host, port).getOutputStream) | |
override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream) | |
override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError() | |
override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close() | |
override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
16/06/24 13:40:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/24 13:42:17 ERROR Executor: Exception in task 16.0 in stage 1.0 (TID 17)
redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool
at redis.clients.util.Pool.returnResourceObject(Pool.java:65)
at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:113)
at util.RedisDB$.returnRedis(RedisDB.scala:58)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalStateException: Returned object not currently part of this pool
at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:537)
at redis.clients.util.Pool.returnResourceObject(Pool.java:63)
... 18 more