Skip to content

Instantly share code, notes, and snippets.

@TRBaldim
TRBaldim / KeysTracker.scala
Created April 12, 2018 03:24
Get all keys with play api.
object KeysTracker {
def allKeys(json: JsValue, parent: String): Seq[String] = json match {
case JsObject(fields) => (fields.map(parent + "." + _._1) ++ fields.map(x => allKeys(x._2, parent + "." + x._1)).flatMap(_.toSeq)).toSeq
case JsArray(as) => as.flatMap(x => allKeys(x, parent))
case _ => Seq.empty[String]
}
}
# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic \
--bootstrap-servers brokerHost:9092 \
--zookeeper zookeeperHost:2181
@TRBaldim
TRBaldim / referseList.scala
Created April 21, 2017 16:27
Reverse a List in Scala
@tailrec
def reverseList[T](inputList: List[T], rList: List[T]): List[T] ={
inputList match {
case Nil => rList
case h::t => reverseList(t, h::rList)
}
}
@TRBaldim
TRBaldim / SplitRDDbyKey.py
Created January 20, 2017 19:51
Split RDD into two or more RDD based in keys or other column of RDD
from pyspark import SparkContext
# Close to the same solution of randomSplit
# https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L429
class SplitByKey:
def __init__(self, key):
self.key = key
self.split = None
def func(self, split, iterator):
@TRBaldim
TRBaldim / updateMap.scala
Created January 17, 2017 21:50
Update Immutable Map in Scala
def updateMap[A, B](map: Map[A, List[B]], key: A, value: B) =
map + ((key, map.getOrElse(key, List()) ::: List(value)))
var topicsMap = offsetsRanges.map { o => (o.topic, List[String]())}.toMap
offsetsRanges.foreach { o =>
topicsMap = updateMap[String, String](topicsMap, o.topic, (o.partition + ":" + o.untilOffset))}
@TRBaldim
TRBaldim / cofigure_hive_partition.py
Created December 8, 2016 11:18
Creating partition in Spark
hive_context.setConf("spark.sql.hive.convertMetastoreOrc", "false")
hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
my_df.select(*[col(c).alias(c) for c in columns])\
.write.mode("append")\
.partitionBy('year-month')\
.insertInto("my_table_patitioned")
import com.ibm.mq.jms.MQQueueConnectionFactory
import com.ibm.mq.jms.JMSC
import scala.io.Source
import javax.jms.DeliveryMode
import javax.jms.Session
def toRun(){
val fileName = "/home/my_home/path/local_file"
val cf = new MQQueueConnectionFactory();
@TRBaldim
TRBaldim / my_python_kafka_producer.py
Last active February 26, 2019 02:53
Sending RDD in pySpark to a Scala function calling in python side.
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
reserialized_rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
rdd_java = rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
_jvm = sc._jvm
_jvm.myclass.apps.mine\
.pyKafka\
.sendMessageByRDD("host:6667",