Last active
March 28, 2024 14:22
-
-
Save akki/c2c817c71ae04ee654bb97b26595fb7b to your computer and use it in GitHub Desktop.
Spark app with magic committer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# These extra-configs enable the magic committer. | |
# A _SUCCESS file with data is generated if these extra configs are passed. The file would otherwise be zero-byte in size, indicating that magic commiter did NOT get enabled. | |
extra_configs = { | |
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol", | |
"spark.sql.parquet.output.committer.class": "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter", | |
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory", | |
"spark.hadoop.fs.s3a.committer.name": "magic", | |
"spark.hadoop.fs.s3a.committer.magic.enabled": "true", | |
} | |
with create_spark_session( | |
override_configs = extra_configs, | |
common_configs={ | |
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "true", | |
"spark.dynamicAllocation.enabled": "false", | |
"spark.executor.instances": 30, | |
}, | |
) as session: | |
log.info(f"Running on Spark3 - query:\n{query}") | |
data = session.sql("<some-SELECT-sql-statement>") | |
data = data.coalesce(30) | |
log.info(f"Saving datapoints to {file_path_spark3}") | |
# Write the data | |
t = time.time() | |
data.write.parquet( | |
file_path_spark3, mode="overwrite" | |
) | |
print(f"Took {time.time() - t}s") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment