Created
December 6, 2024 03:52
-
-
Save marcelofern/aecc16d3fb3ba1135a83c890f19a9e46 to your computer and use it in GitHub Desktop.
Json filtering vs text casting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import re | |
import random | |
import psycopg2 | |
# Change these before running locally. | |
DB_NAME = "json_test" | |
USER = "marcelo.fernandes" | |
PASSWORD = "" | |
HOST = "localhost" | |
PORT = 5415 | |
NUM_OF_ROWS = 10_000_000 | |
INSERT_ROWS_PER_BATCH = 1000_000 | |
NUM_OF_QUERIES = 30_000 | |
# Uncomment these on local dev for tests to run fast. | |
# NUM_OF_ROWS = 10_000 | |
# INSERT_ROWS_PER_BATCH = 1000 | |
# NUM_OF_QUERIES = 1 | |
def get_cursor_and_connection(): | |
conn = psycopg2.connect( | |
dbname=DB_NAME, | |
user=USER, | |
password=PASSWORD, | |
host=HOST, | |
port=PORT, | |
) | |
conn.autocommit = True | |
return conn.cursor(), conn | |
def create_table(cursor): | |
print("- Creating normalised table...") | |
cursor.execute(""" | |
-- Idempotency for convenience. | |
DROP TABLE IF EXISTS json_test; | |
CREATE TABLE json_test ( | |
id SERIAL PRIMARY KEY, | |
params jsonb | |
); | |
-- Disable autovacuum to not interfere with results. | |
ALTER TABLE json_test SET (autovacuum_enabled = false); | |
""") | |
def vacuum_table(cursor): | |
print("- Vacuuming table...") | |
start_time = time.time() | |
cursor.execute("VACUUM ANALYZE json_test;") | |
duration = time.time() - start_time | |
print(f"- Vacuum took: {duration:.2f} seconds.") | |
def populate_table(cursor): | |
print("- Populating table...") | |
start_time = time.time() | |
for batch in range(int(NUM_OF_ROWS / INSERT_ROWS_PER_BATCH)): | |
rows = [] | |
for i in range(INSERT_ROWS_PER_BATCH): | |
val = i + (batch * INSERT_ROWS_PER_BATCH) | |
json_val = str({"unique_id": str(val).zfill(10)}) | |
json_val = json_val.replace("'", '"') | |
rows.append(f"('{json_val}')") | |
values = ", ".join(rows) | |
cursor.execute(f"INSERT INTO json_test (params) VALUES {values};") | |
duration = time.time() - start_time | |
print(f"- Populating took: {duration:.2f} seconds.") | |
def benchmark_json_queries(cursor, values): | |
print( | |
f"- Benchmarking {NUM_OF_QUERIES:,} filtered queries against json_test table..." | |
) | |
explain_outputs = [] | |
for i in range(NUM_OF_QUERIES): | |
val = str(values[i]).zfill(10) | |
query = f"SELECT params FROM json_test where params->>'unique_id' = '{val}' AND id={values[i]} LIMIT 1;" | |
cursor.execute(f"EXPLAIN (ANALYSE, BUFFERS, costs off, summary off) {query}") | |
explain_outputs.append(cursor.fetchall()) | |
print_explain_info(explain_outputs) | |
def benchmark_cast_queries(cursor, values): | |
print( | |
f"- Benchmarking {NUM_OF_QUERIES:,} casted queries against json_test table..." | |
) | |
explain_outputs = [] | |
for i in range(NUM_OF_QUERIES): | |
val = str(values[i]).zfill(10) | |
query = f"SELECT params FROM json_test where params::text like '%{val}%' and id={values[i]} LIMIT 1;" | |
cursor.execute(f"EXPLAIN (ANALYSE, BUFFERS, costs off, summary off) {query}") | |
explain_outputs.append(cursor.fetchall()) | |
print_explain_info(explain_outputs) | |
def print_explain_info(explain_outputs): | |
shared_hit = 0 | |
shared_read = 0 | |
shared_written = 0 | |
total_timing = (0, 0) | |
for explain_output in explain_outputs: | |
found_buffers = False | |
for line in explain_output: | |
line = line[0] | |
if line.startswith(" Buffers:") and found_buffers is False: | |
found_buffers = True | |
# Look for buffer stats in the output using regular expressions | |
match = re.search(r"shared hit=(\d+)", line) | |
if match: | |
shared_hit += int(match.group(1)) | |
match = re.search(r"read=(\d+)", line) | |
if match: | |
shared_read += int(match.group(1)) | |
match = re.search(r"written=(\d+)", line) | |
if match: | |
shared_written += int(match.group(1)) | |
timing = explain_output[0][0].split("actual time=")[1].split(" ")[0].split("..") | |
total_timing = ( | |
total_timing[0] + float(timing[0]), | |
total_timing[1] + float(timing[1]), | |
) | |
size = len(explain_outputs) | |
print(f" - average timing = {total_timing[0]/size}..{total_timing[1]/size}") | |
print(f" - average buffers hit {shared_hit/size}") | |
print(f" - average buffers read {shared_read/size}") | |
print(f" - average buffers written {shared_written/size}") | |
def run_tests(): | |
print( | |
f"\nReport details:\n" | |
f" - rows in each table: {NUM_OF_ROWS:,}\n" | |
f" - number of queries to benchmark: {NUM_OF_QUERIES:,}\n" | |
) | |
cursor, conn = get_cursor_and_connection() | |
create_table(cursor) | |
populate_table(cursor) | |
vacuum_table(cursor) | |
query_values = [random.randint(0, NUM_OF_ROWS) for _ in range(NUM_OF_QUERIES)] | |
# Run a first time just to warm the cache | |
benchmark_json_queries(cursor, query_values) | |
# Then run both twice to see if results approximate | |
benchmark_json_queries(cursor, query_values) | |
benchmark_json_queries(cursor, query_values) | |
benchmark_cast_queries(cursor, query_values) | |
benchmark_cast_queries(cursor, query_values) | |
cursor.close() | |
conn.close() | |
if __name__ == "__main__": | |
run_tests() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Report details:
rows in each table: 10,000,000
number of queries to benchmark: 30,000
Creating normalised table...
Populating table...
Populating took: 65.19 seconds.
Vacuuming table...
Vacuum took: 1.11 seconds.
Benchmarking 30,000 filtered queries against json_test table...
Benchmarking 30,000 filtered queries against json_test table...
Benchmarking 30,000 filtered queries against json_test table...
Benchmarking 30,000 casted queries against json_test table...
Benchmarking 30,000 casted queries against json_test table...