Last active
August 18, 2017 03:15
-
-
Save airawat/758a401ccc4f8d43f6edc9df6c464523 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit --class com.khanolkar.bda.util.CompactRawLogs \ | |
............ | |
MyJar-1.0.jar \ | |
"/user/akhanolk/data/raw/streaming/to-be-compacted/" \ | |
"/user/akhanolk/data/raw/compacted/" \ | |
"2" "128" "oozie-124" | |
package com.khanolkar.bda.util | |
/** | |
* @author Anagha Khanolkar | |
* TODO: add robust exception handling & logging (integrate into OMS) | |
*/ | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark._ | |
import org.apache.hadoop.fs.{ FileSystem, Path } | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.spark.sql._ | |
object CompactRawLogs { | |
//Main function | |
def main(args: Array[String]): Unit = { | |
//Command-line arguments check | |
if (args.length != 5) { | |
println("Please provide 5 parameters: <inputDirectory> <outputDirectory> <minFilesToCompact> <minCompactedFileSizeInMB> <oozieWorkflowID>") | |
System.exit(1) | |
} | |
//Capture command-line arguments | |
val inputDirectory = args(0) //Logs to be compacted | |
val outputDirectory = args(1) //Compacted logs | |
val minFilesToCompact = args(2).toInt // Minimum number of files to execute compaction | |
val minCompactedFileSizeInMB = args(3).toInt //E.g. 128 will compact in 128 MB file sizes | |
val oozieWorkflowID = args(4) //So we can correlate batch with workflow when we log status | |
//Spark Session | |
val sparkSession: SparkSession = SparkSession.builder().master("yarn").getOrCreate() | |
//Recursive glob support | |
import sparkSession.implicits._ | |
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") | |
//Get the FileStatus - we will use this to delete raw log directories, post compaction success | |
val fs = FileSystem.get(new Configuration()) | |
val inputDirFileStatusArr = fs.listStatus(new Path(inputDirectory)) | |
//Proceed with compaction only if we have enough files to compact | |
if(inputDirFileStatusArr.length > minFilesToCompact) //Ensure we have directories to compact | |
{ | |
var errorString = "" | |
//Attempt compaction | |
try{ | |
//Determine how many files to coalesce to | |
val inputDirSize = fs.getContentSummary(new Path(inputDirectory)).getLength | |
var outputFileCount= Math.floor(inputDirSize/(minCompactedFileSizeInMB * 1024 * 1024)).toInt | |
if (outputFileCount==0) | |
outputFileCount = 1 | |
//Read and persist after coalescing | |
val datasetRDD = sparkSession.sparkContext.textFile(inputDirectory) | |
datasetRDD.coalesce(outputFileCount).saveAsTextFile(outputDirectory + "/" + oozieWorkflowID) | |
//No errors, so delete raw logs in input directory by iterating over FileStatus | |
inputDirFileStatusArr.foreach(x => { | |
val dirToDelete: Path = x.getPath | |
fs.delete(dirToDelete, true) | |
}) | |
} catch { | |
case e: Exception => | |
errorString = e.getStackTrace.toString() | |
} finally{ | |
//TODO: Log to integrate into OMS - workflowID | errorString | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment