Skip to content

Instantly share code, notes, and snippets.

@shatestest
Created October 27, 2018 19:45
Show Gist options
  • Save shatestest/86ae9559c6114999e13a6eec3c80ec2b to your computer and use it in GitHub Desktop.
Save shatestest/86ae9559c6114999e13a6eec3c80ec2b to your computer and use it in GitHub Desktop.
package oracle2cassandra.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
import com.snp.helpers.Model._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.Seq
import java.io.IOException
import com.snp.utils.DbUtils
import java.sql.SQLException
import com.snp.utils.Utils
import org.apache.spark.storage.StorageLevel
import java.io.PrintWriter
import java.io.StringWriter
class BMValsProcessor{
println("Entering BMValsProcessor ")
val PARTITION_COLUMN = "M_ID"
val COLUMNS = Seq("mo_id","type","value","data_date","data_item_code",
"create_date","last_update_date","create_user_txt","update_user_txt");
def process(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession ,columnFamilyName: String ) : Unit ={
try{
/*
* Load the data from oracle for given schema and query.
*/
val ora_m_vals_df = DbUtils.readOracleData(oraOptionDfConfig, "OracleSchemaTest" ,
PARTITION_COLUMN, "(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T" );
printf(" Oracle table recordCount ::: " + ora_m_vals_df.count() )
val o_m_vals_df = DbUtils.covertColumns(ora_m_vals_df,COLUMNS);
//Persist the currenet Data frame on Disk.
o_m_vals_df.persist(StorageLevel.DISK_ONLY);
/*
* Write to Cassandra DB in given keyspace and columnFamily.
*/
try {
///logic to save cassandra
println(" Saved table in cassandra ")
}catch{
case e:IOException => {
println(" Error saving in cassandra ")
}
case e:SQLException => {
println(" Error saving in cassandra ")
}
}//end of
}catch{
case e:SQLException =>{
println(s"Error while reading oracle table " + printExceptionStackTrace(e))
}
case e:Exception => {
println(Utils.printExceptionStackTrace(e))
}
}finally{
sparkSession.close()
}
println("processing completed")
}
def printExceptionStackTrace(e:Exception ) : String = {
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
sw.toString
}
}
package oracle2cassandra.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
import com.snp.helpers.Model._
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.Seq
import java.io.IOException
import com.snp.utils.DbUtils
import java.sql.SQLException
import com.snp.utils.Utils
import org.apache.spark.storage.StorageLevel
import java.io.PrintWriter
import java.io.StringWriter
class BMValsProcessor{
println("Entering BMValsProcessor ")
val PARTITION_COLUMN = "M_ID"
val COLUMNS = Seq("mo_id","type","value","data_date","data_item_code",
"create_date","last_update_date","create_user_txt","update_user_txt");
def process(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession ,columnFamilyName: String ) : Unit ={
try{
/*
* Load the data from oracle for given schema and query.
*/
val ora_m_vals_df = DbUtils.readOracleData(oraOptionDfConfig, "OracleSchemaTest" ,
PARTITION_COLUMN, "(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T" );
printf(" Oracle table recordCount ::: " + ora_m_vals_df.count() )
val o_m_vals_df = DbUtils.covertColumns(ora_m_vals_df,COLUMNS);
//Persist the currenet Data frame on Disk.
o_m_vals_df.persist(StorageLevel.DISK_ONLY);
/*
* Write to Cassandra DB in given keyspace and columnFamily.
*/
try {
///logic to save cassandra
println(" Saved table in cassandra ")
}catch{
case e:IOException => {
println(" Error saving in cassandra ")
}
case e:SQLException => {
println(" Error saving in cassandra ")
}
}//end of
}catch{
case e:SQLException =>{
println(s"Error while reading oracle table " + printExceptionStackTrace(e))
}
case e:Exception => {
println(Utils.printExceptionStackTrace(e))
}
}finally{
sparkSession.close()
}
println("processing completed")
}
def printExceptionStackTrace(e:Exception ) : String = {
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
sw.toString
}
}
package oracle2cassandra.scala
import scala.collection.mutable.LinkedHashMap
import org.apache.spark.sql
import org.apache.spark.sql.DataFrameReader
import org.apache.spark.sql.SparkSession
object Driver {
val ORACLE_LOWER_BOUND = "1";
val ORACLE_UPPER_BOUND = "100000000";
val ORACLE_NUM_PARTITIONS="2";
val ORACLE_FETCHSIZE="10000";
val ORACLE_DRIVER="oracle.jdbc.OracleDriver";
val o_url ="jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=1)(PORT=1529))(ADDRESS=(PROTOCOL=TCP)(HOST=2)(PORT=1529))(ADDRESS=(PROTOCOL=TCP)(HOST=2)(PORT=1529)) (LOAD_BALANCE=yes)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=dev)))"
val o_userName="oracle"
val o_passwd="oracle"
def main(args: Array[String]): Unit = {
val columnFamilyName = "bm_vals";
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master","local[8]")
.config("spark.dynamicAllocation.enabled",true)
.config("spark.executor.memory","4g")
.config("spark.executor.cores",8)
.config("spark.executor.instances",4)
.getOrCreate()
/*
* Oracle/Jdbc options
*/
val ora_df_options_conf = spark.read.format("jdbc")
.option("url", o_url)
.option("driver", ORACLE_DRIVER)
.option("user", o_userName)
.option("password", o_passwd)
.option("lowerBound", ORACLE_LOWER_BOUND)
.option("upperBound", ORACLE_UPPER_BOUND)
.option("numPartitions", ORACLE_NUM_PARTITIONS)
.option("fetchsize",ORACLE_FETCHSIZE)
val procs : LinkedHashMap[String, (DataFrameReader, SparkSession, String) => Unit] = getAllDefinedProcessors();
for ( key <- procs.keys){
procs.get(key).map{
println("process started for loading column family : " + key);
fun => fun(ora_df_options_conf,spark,columnFamilyName)
}
}
}
def getAllDefinedProcessors(): LinkedHashMap[String, (DataFrameReader, SparkSession, String) => Unit] = {
LinkedHashMap(
"bm_vals" -> new BMValsProcessor().process,
"bm_vals2" -> new BMValsProcessor2().process
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment