Skip to content

Instantly share code, notes, and snippets.

@harshavardhana
Last active March 8, 2026 06:20
Show Gist options
  • Select an option

  • Save harshavardhana/93c28c28bf700243b3199748427a62a4 to your computer and use it in GitHub Desktop.

Select an option

Save harshavardhana/93c28c28bf700243b3199748427a62a4 to your computer and use it in GitHub Desktop.
PySpark parquet overwrite pattern — tests partition prefix visibility after overwrite on S3/MinIO

PySpark Parquet Overwrite — Partition Prefix Visibility Test

Tests that after Spark overwrites partitioned Parquet files on S3/MinIO, the date-level partition prefixes remain visible in delimited ListObjectsV2 so that Spark's partition discovery still works correctly.

What the test does

  1. Generates two sample CSV files (batch1.csv, batch2.csv) with the same schema and same date partitions but different values.
  2. Writes batch1 as partitioned Parquet (append mode), creating:
    s3a://your-bucket/data/transactions/date=2024-01-01/part-00000.parquet
    s3a://your-bucket/data/transactions/date=2024-01-02/part-00000.parquet
    
  3. Writes batch2 to the same path with overwrite mode — Spark deletes the old .parquet files and writes new ones under the same date prefixes.
  4. Reads back the Parquet and asserts the partition folders are still discoverable. If the object store's delimited listing is broken, Spark returns an empty dataset silently.

Prerequisites

Requirement Version
Python 3.8+
PySpark 3.3+
pip install pyspark

The S3A jars are resolved automatically via --packages — no manual jar downloads needed. hadoop-aws pulls in aws-java-sdk-bundle as a transitive dependency from Maven Central.

Running against MinIO

Edit the SparkSession block at the top of the script to point at your MinIO instance:

spark = SparkSession.builder \
    .appName("parquet-overwrite-test") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

Also update BASE_PATH to a bucket that already exists on your MinIO:

BASE_PATH = "s3a://your-bucket/data/transactions"

Then run:

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.4 spark_parquet_overwrite_test.py

Running against AWS S3

spark = SparkSession.builder \
    .appName("parquet-overwrite-test") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

Credentials are picked up from ~/.aws/credentials or environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.4 spark_parquet_overwrite_test.py

Expected output

After the overwrite, both reads should return data — not empty results:

+---+-----+------+----------+
| id| name|amount|      date|
+---+-----+------+----------+
|  1|Alice|   110|2024-01-01|
|  2|  Bob|   220|2024-01-01|
...
+----------+
|      date|
+----------+
|2024-01-01|
|2024-01-02|
+----------+

Empty output from the partition listing means the object store is not returning the date=XXXX/ common prefixes correctly after the overwrite.

import csv
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# ---------------------------------------------------------------------------
# Generate sample CSV files
# ---------------------------------------------------------------------------
def write_csv(path, rows):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(rows)
# batch1: initial load — two dates, three records each
BATCH1 = [
[1, "Alice", 100, "2024-01-01"],
[2, "Bob", 200, "2024-01-01"],
[3, "Carol", 300, "2024-01-01"],
[4, "Dave", 400, "2024-01-02"],
[5, "Eve", 500, "2024-01-02"],
[6, "Frank", 600, "2024-01-02"],
]
# batch2: updated data for the same two dates (overwrites batch1 partition files)
BATCH2 = [
[1, "Alice", 110, "2024-01-01"],
[2, "Bob", 220, "2024-01-01"],
[3, "Carol", 330, "2024-01-01"],
[4, "Dave", 440, "2024-01-02"],
[5, "Eve", 550, "2024-01-02"],
[6, "Frank", 660, "2024-01-02"],
]
write_csv("/tmp/input/batch1.csv", BATCH1)
write_csv("/tmp/input/batch2.csv", BATCH2)
# ---------------------------------------------------------------------------
# Spark session
# ---------------------------------------------------------------------------
spark = SparkSession.builder \
.appName("parquet-overwrite-test") \
.getOrCreate()
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("amount", IntegerType(), True),
StructField("date", StringType(), False),
])
BASE_PATH = "s3a://your-bucket/data/transactions"
# ---------------------------------------------------------------------------
# Step 1: Read batch1 CSV and write partitioned parquet (append)
# ---------------------------------------------------------------------------
df1 = spark.read.schema(schema).csv("/tmp/input/batch1.csv")
df1.write \
.partitionBy("date") \
.mode("append") \
.parquet(BASE_PATH)
# At this point:
# .../transactions/date=2024-01-01/part-00000.parquet
# .../transactions/date=2024-01-02/part-00000.parquet
# ---------------------------------------------------------------------------
# Step 2: Read batch2 CSV (same schema, same dates, updated amounts)
# ---------------------------------------------------------------------------
df2 = spark.read.schema(schema).csv("/tmp/input/batch2.csv")
# ---------------------------------------------------------------------------
# Step 3: Overwrite — drops existing parquet files under matching date
# partitions, writes new ones. The date=XXXX/ prefix MUST still
# appear in a delimited ListObjectsV2 on the parent path afterward.
# ---------------------------------------------------------------------------
df2.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(BASE_PATH)
# ---------------------------------------------------------------------------
# Step 4: Verify partition prefixes are still visible after overwrite.
# If the object store's delimited listing is broken the partition
# folders won't be discovered and both reads below return nothing.
# ---------------------------------------------------------------------------
result = spark.read.parquet(BASE_PATH)
result.show()
spark.catalog.refreshByPath(BASE_PATH)
partitions = spark.read.parquet(BASE_PATH).select("date").distinct()
partitions.show()
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment