Last active
February 3, 2025 13:20
-
-
Save rjurney/300a897a93cd1a713570ddaf8f2d344f to your computer and use it in GitHub Desktop.
A proposed monkey patch to save PySpark DataFrames into a single CSV file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import shutil | |
import uuid | |
from pyspark.sql.readwriter import DataFrameWriter | |
def csv_single(self, path, **options): | |
""" | |
Write the DataFrame as a single CSV file at the specified path. | |
This method repartitions the DataFrame to a single partition, | |
writes the CSV to a temporary directory, then moves the resulting | |
CSV file (which Spark always writes as a part file) to the given path, | |
and cleans up the temporary directory. | |
Parameters: | |
path (str): The target CSV file path. | |
**options: Additional options for the csv writer (e.g. header="true", mode="overwrite"). | |
Example: | |
df.write.csv_single("/tmp/counts.csv", header=True, mode="overwrite") | |
""" | |
# Create a unique temporary directory based on the target file path. | |
tmp_dir = path + "_" + uuid.uuid4().hex | |
# Write the CSV into the temporary directory. | |
self.coalesce(1).csv(tmp_dir, **options) | |
# Look for the part file Spark produces; it will match "part-*.csv". | |
pattern = os.path.join(tmp_dir, "part-*.csv") | |
csv_files = glob.glob(pattern) | |
if not csv_files: | |
raise Exception(f"No CSV file found in temporary directory {tmp_dir}") | |
csv_file = csv_files[0] | |
# If the target file exists and mode is 'overwrite', remove it. | |
mode = options.get("mode", "errorifexists").lower() | |
if os.path.exists(path) and mode == "overwrite": | |
os.remove(path) | |
# Move the CSV file to the desired location. | |
shutil.move(csv_file, path) | |
# Remove the temporary directory. | |
shutil.rmtree(tmp_dir) | |
# Monkey patch the DataFrameWriter class. | |
DataFrameWriter.csv_single = csv_single |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment