Skip to content

Instantly share code, notes, and snippets.

@TRBaldim
Last active February 26, 2019 02:53
Show Gist options
  • Save TRBaldim/2bf67f21bd31ceca3c44f12208a86be6 to your computer and use it in GitHub Desktop.
Save TRBaldim/2bf67f21bd31ceca3c44f12208a86be6 to your computer and use it in GitHub Desktop.
Sending RDD in pySpark to a Scala function calling in python side.
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
reserialized_rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
rdd_java = rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
_jvm = sc._jvm
_jvm.myclass.apps.mine\
.pyKafka\
.sendMessageByRDD("host:6667",
"SASL_PLAINTEXT",
rdd_java,
'my_topic')
import org.apache.spark.api.java.JavaRDD
def sendMessageByRDD(broker :String, protocol :String, rdd :JavaRDD[Any], topic :String) = {
val props = new Properties()
props.put("security.protocol", protocol)
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("bootstrap.servers", broker)
???
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment