Last active
June 17, 2025 18:49
-
-
Save dshemetov/4da482a0bcd5109620aa0ff1938ce1fa to your computer and use it in GitHub Desktop.
Postgres bulk loading tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "duckdb", | |
# "polars", | |
# "psycopg2-binary", | |
# ] | |
# /// | |
"""Benchmarking different ways to insert data into Postgres. | |
1. Open a file buffer, then stream it to Postgres via STDIN COPY | |
2. Create a staging table, then stream it to Postgres via STDIN COPY, then | |
insert into the target table (this is meant to simulate having to load to a | |
temp table and then loading to a more cleaned up table officially, since COPY | |
is strict about the table schema and we are likely to need to do this) | |
3. Create a staging table, then stream it to Postgres via STDIN COPY, then | |
insert into the target table which has a few triggers (simulating our audit | |
table) | |
4. Load to a Polars DataFrame, then iterate over rows and insert via INSERT INTO | |
... VALUES ... with executemany() (simulation of the current delphi-epidata | |
approach and the approach in base Airflow S3ToSQLOperator) | |
5. Load to a Polars DataFrame, then write as CSV to memory byte buffer, then | |
insert via STDIN COPY | |
This uses a 250MB/600k row/scale factor 0.1 TCP-H test dataset generated by | |
DuckDB. You can find the documentation for that here | |
https://duckdb.org/docs/stable/core_extensions/tpch#pre-generated-data-sets. | |
(The smallest download they offer there is scale factor 1, which is a 6M row | |
file and that proved to be too long for some quick tests I wanted to run.) | |
The timings on my machine were (NVMe drive, Unix domain socket w/ Docker, db | |
driver is the Postgres standard psycopg2): | |
1. COPY direct: 1.34 s | |
2. COPY → staging → INSERT SELECT: 2.30 s | |
3. TODO | |
4. INSERT INTO ... VALUES ... with executemany(): 230. s | |
5. LOAD to Polars and then insert via STDIN COPY: 1.55 s | |
Usage: | |
``` | |
# With uv (https://github.com/astral-sh/uv) | |
uv run benchmark.py | |
# With pip | |
python -m venv .test-venv | |
source .test-venv/bin/activate | |
pip install -r duckdb polars psycopg2-binary | |
python benchmark.py | |
``` | |
""" | |
import io | |
import time | |
from contextlib import contextmanager | |
from pathlib import Path | |
import duckdb | |
import polars as pl | |
import psycopg2 | |
@contextmanager | |
def timer(description): | |
start = time.time() | |
yield | |
print(f"{description}: {time.time() - start:.2f}s") | |
def create_duckdb_table(file_name): | |
conn = duckdb.connect(":memory:") | |
conn.sql("INSTALL tpch; LOAD tpch;") | |
conn.sql("CALL dbgen(sf = 0.1);") | |
conn.sql(f"COPY (SELECT * FROM lineitem) TO '{file_name}' (FORMAT CSV, HEADER);") | |
conn.close() | |
# Connect to Postgres and setup the database for testing. | |
conn = psycopg2.connect( | |
host="localhost", | |
database="postgres", | |
user="postgres", | |
password="postgres", | |
) | |
cursor = conn.cursor() | |
cursor.execute("DROP SCHEMA IF EXISTS test CASCADE;") | |
cursor.execute("CREATE SCHEMA test;") | |
cursor.execute("SET search_path TO test;") | |
conn.commit() # Ensure the schema is committed | |
if Path("lineitem.csv").exists(): | |
print("lineitem.csv already exists, skipping creation.") | |
else: | |
create_duckdb_table("lineitem.csv") | |
print("lineitem.csv created.") | |
def create_table(cursor, table_name, schema="test"): | |
cursor.execute(f"DROP TABLE IF EXISTS {schema}.{table_name};") | |
cursor.execute( | |
f"CREATE TABLE {schema}.{table_name} (l_orderkey INT, l_partkey INT, l_suppkey INT, l_linenumber INT, l_quantity DECIMAL(10,2), l_extendedprice DECIMAL(10,2), l_discount DECIMAL(10,2), l_tax DECIMAL(10,2), l_returnflag CHAR(1), l_linestatus CHAR(1), l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipinstruct VARCHAR(25), l_shipmode VARCHAR(10), l_comment VARCHAR(44));" | |
) | |
conn.commit() | |
# Read the dataset into a Polars DataFrame for future use. | |
df = pl.read_csv("lineitem.csv") | |
print(f"Dataset: {len(df)} rows, {df.estimated_size('mb'):.1f} MB") | |
# Test 1: COPY direct | |
with timer("COPY direct"), conn.cursor() as cursor, open("lineitem.csv", "rb") as f: | |
create_table(cursor, "test1") | |
cursor.copy_expert("COPY test1 FROM stdin (FORMAT CSV, HEADER);", f) | |
# Test 2: COPY → staging → INSERT SELECT | |
with timer("COPY → staging → INSERT SELECT"), conn.cursor() as cursor, open("lineitem.csv", "rb") as f: | |
create_table(cursor, "test2") | |
create_table(cursor, "staging") | |
cursor.copy_expert("COPY staging FROM stdin (FORMAT CSV, HEADER);", f) | |
cursor.execute("INSERT INTO test2 SELECT * FROM staging;") | |
# Test 3: COPY → staging → INSERT SELECT + TRIGGER | |
# TODO: Add a trigger to the staging table to test the performance of the trigger. | |
# with timer("COPY → staging → INSERT SELECT + TRIGGER"): | |
# conn.execute("DROP TABLE IF EXISTS test3, staging; CREATE TABLE test3 AS SELECT * FROM lineitem LIMIT 0;") | |
# conn.execute("CREATE TABLE staging AS SELECT * FROM lineitem LIMIT 0;") | |
# conn.execute("COPY staging FROM 'lineitem.csv' (FORMAT CSV, HEADER);") | |
# conn.execute("INSERT INTO test3 SELECT * FROM staging;") | |
# Test 4: INSERT INTO ... VALUES ... with executemany() | |
# Don't run this test, it's too slow. It takes ~230 seconds to run. | |
EXECUTE_MANY = False | |
if EXECUTE_MANY: | |
with timer("INSERT INTO executemany() simulation"), conn.cursor() as cursor: | |
create_table(cursor, "test3") | |
# Batch inserts of ~1 million rows | |
for frame in df.iter_slices(10**6): | |
cursor.executemany( | |
"INSERT INTO test3 VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", frame.iter_rows() | |
) | |
# Test 5: Load to Polars, then write as CSV to memory byte buffer, then insert via STDIN COPY | |
with timer("LOAD to Polars and then insert via STDIN COPY"), conn.cursor() as cursor: | |
df = pl.read_csv("lineitem.csv") | |
buffer = io.BytesIO() | |
df.write_csv(buffer) | |
buffer.seek(0) | |
create_table(cursor, "test4") | |
cursor.copy_expert("COPY test4 FROM stdin (FORMAT CSV, HEADER);", buffer) | |
with conn.cursor() as cursor: | |
# Sanity check: make sure the first 100 rows of all the tables are equivalent. | |
cursor.execute("SELECT * FROM test1 LIMIT 100;") | |
rows1 = cursor.fetchall() | |
cursor.execute("SELECT * FROM test2 LIMIT 100;") | |
rows2 = cursor.fetchall() | |
if EXECUTE_MANY: | |
cursor.execute("SELECT * FROM test3 LIMIT 100;") | |
rows3 = cursor.fetchall() | |
cursor.execute("SELECT * FROM test4 LIMIT 100;") | |
rows4 = cursor.fetchall() | |
assert len(rows1) == 100 | |
if EXECUTE_MANY: | |
print(rows1 == rows2 == rows3 == rows4) | |
else: | |
print(rows1 == rows2 == rows4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment