-
-
Save relud/7b5d9a48823e0d8e140ab94a96b0aa9a to your computer and use it in GitHub Desktop.
Test repartitioning behaviour when writing parquet data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.UUID.randomUUID | |
import scala.sys.process._ | |
import java.util.zip.CRC32 | |
import com.mozilla.telemetry.utils.getOrCreateSparkSession | |
val spark = getOrCreateSparkSession("test") | |
spark.sparkContext.setLogLevel("WARN") | |
import spark.implicits._ | |
def getPartitionId(clientId: String, sampleId: Int, filesPerPartition: Int) = { | |
val crc = new CRC32 | |
crc.update(if (clientId == null) "".getBytes else clientId.getBytes) | |
sampleId * filesPerPartition + crc.getValue % filesPerPartition | |
} | |
val sampleIdModulus = 100 | |
val filesPerPartition = 4 | |
val partitionId = (c: String, s: Int) => getPartitionId(c, s, filesPerPartition) | |
spark.sqlContext.udf.register("partid", partitionId) | |
// test with null sample_id | |
case class input(sample_id:Option[Long], client_id:Option[String], document_id:Option[String]) | |
val df = spark.sqlContext.createDataFrame( | |
List( | |
input(None, None, None), | |
input(Some(1), None, None), | |
input(None, Some("client"), None), | |
input(None, None, Some("document")), | |
input(Some(1), Some("client"), None), | |
input(Some(1), None, Some("document")), | |
input(None, Some("client"), Some("document")) | |
) | |
).union( | |
(1 to 200000).map { e => (e%sampleIdModulus, (e%10000).toString, randomUUID().toString) }.toList.toDF() | |
) | |
val df2 = df.selectExpr("*", "partid(client_id, sample_id) as part_id") | |
df2.repartition(df2.col("part_id")).drop("part_id").write.partitionBy("sample_id").mode("overwrite").parquet("/tmp/test-output") | |
"find /tmp/test-output -type f -name *.parquet" #| "wc -l" ! | |
// 397 files. Four of the sample_id prefixes have only three files, and one file is for a null sample_id. CRC collisions maybe? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment