Created
January 15, 2018 18:15
-
-
Save mreid-moz/7374496e9d415ca85b42a95b011ac852 to your computer and use it in GitHub Desktop.
Test repartitioning behaviour when writing parquet data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.UUID.randomUUID | |
import scala.sys.process._ | |
import com.mozilla.telemetry.utils.getOrCreateSparkSession | |
import java.util.zip.CRC32 | |
val spark = getOrCreateSparkSession("test") | |
spark.sparkContext.setLogLevel("WARN") | |
import spark.implicits._ | |
def getSampleId(clientId: String, modulus: Int) = { | |
// TODO: handle null | |
val crc = new CRC32 | |
crc.update(clientId.getBytes) | |
crc.getValue % modulus | |
} | |
val sampleIdModulus = 100 | |
val filesPerPartition = 4 | |
val sampleId = (c: String) => getSampleId(c, sampleIdModulus) | |
spark.sqlContext.udf.register("sampleid", sampleId) | |
val partitionId = (c: String) => getSampleId(c, filesPerPartition * sampleIdModulus) | |
spark.sqlContext.udf.register("partid", partitionId) | |
val df = (1 to 200000).map { e => ((e%10000).toString, randomUUID().toString) }.toList.toDF("client_id", "document_id") | |
val df2 = df.selectExpr("*", "sampleid(client_id) as sample_id", "partid(client_id) as part_id") | |
df2.repartition(df2.col("part_id")).drop("part_id").write.partitionBy("sample_id").mode("overwrite").parquet("/tmp/test-output") | |
"find /tmp/test-output -type f -name *.parquet" #| "wc -l" ! | |
// 396 files. Four of the sample_id prefixes have only three files. CRC collisions maybe? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment