-
-
Save vncsna/ca12e4addbcabb888ef5662634ceb891 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from pathlib import Path | |
from uuid import uuid4 | |
import os | |
import gc | |
from aiofile import AIOFile | |
from aiofiles import open as aio_open | |
from anyio import open_file as anyio_open | |
from tabulate import tabulate | |
from tqdm import tqdm | |
FILE_SIZES = { | |
"1mb": 1024 * 1024, # Small text-only PDFs | |
"5mb": 5 * 1024 * 1024, # PDFs with some images | |
"10mb": 10 * 1024 * 1024, # PDFs with many images/graphics | |
"20mb": 20 * 1024 * 1024, # PDFs with many images/graphics | |
"50mb": 50 * 1024 * 1024, # Complex PDFs with high-res images | |
"100mb": 100 * 1024 * 1024, # Large technical documents/manuals | |
} | |
WORKERS = 40 | |
ITERATIONS = 10 | |
SORT_RESULTS = True | |
RESULTS = [] | |
TMP_DIR = Path("tmp") | |
TMP_DIR.mkdir(exist_ok=True) | |
READFILE_NAME = "readfile.txt" | |
READFILE_PATH = str(TMP_DIR / "readfile.txt") | |
async def aiofile_read(): | |
async with AIOFile(READFILE_PATH, "r") as afp: | |
data = await afp.read() | |
return data | |
async def aiofile_write(): | |
async with AIOFile(f"tmp/{uuid4().hex}.txt", "w") as afp: | |
await afp.write(DATA) | |
async def aiofiles_read(): | |
async with aio_open(READFILE_PATH, "r") as afp: | |
data = await afp.read() | |
return data | |
async def aiofiles_write(): | |
async with aio_open(f"tmp/{uuid4().hex}.txt", "w") as afp: | |
await afp.write(DATA) | |
async def anyio_read(): | |
async with await anyio_open(READFILE_PATH, "r") as afp: | |
data = await afp.read() | |
return data | |
async def anyio_write(): | |
async with await anyio_open(f"tmp/{uuid4().hex}.txt", "w") as afp: | |
await afp.write(DATA) | |
def stdlib_read(): | |
with open(READFILE_PATH, "r", encoding="UTF-8") as fp: | |
data = fp.read() | |
return data | |
def stdlib_write(): | |
with open(f"tmp/{uuid4().hex}.txt", "w", encoding="UTF-8") as fp: | |
fp.write(DATA) | |
def start_run(): | |
for file in TMP_DIR.iterdir(): | |
if file.name != READFILE_NAME: | |
file.unlink() | |
gc.collect() | |
def finish_run(name, start_time, size_label): | |
elapsed_time = time.time() - start_time | |
average_time = elapsed_time / ITERATIONS | |
RESULTS.append((name, size_label, average_time)) | |
if __name__ == "__main__": | |
async def run_benchmark(): | |
funcs = [ | |
aiofile_read, | |
aiofile_write, | |
aiofiles_read, | |
aiofiles_write, | |
anyio_read, | |
anyio_write, | |
] | |
stdlib_funcs = [stdlib_read, stdlib_write] | |
total_tests = ( | |
len(funcs) + 2 * len(stdlib_funcs) | |
) * ITERATIONS * len(FILE_SIZES) | |
with tqdm(total=total_tests, desc="Running benchmarks") as pbar: | |
for size_label, size in FILE_SIZES.items(): | |
# Update DATA size for current test | |
global DATA | |
DATA = "x" * size | |
# Update read file with current size | |
with open(READFILE_PATH, "w", encoding="UTF-8") as fp: | |
fp.write(DATA) | |
for func in funcs: | |
name = func.__name__ | |
start_run() | |
start = time.time() | |
for _ in range(ITERATIONS): | |
await asyncio.gather(*[func() for _ in range(WORKERS)]) | |
pbar.update(1) | |
finish_run(name, start, size_label) | |
for func in stdlib_funcs: | |
name = func.__name__ | |
start_run() | |
start = time.time() | |
for _ in range(ITERATIONS): | |
for _ in range(WORKERS): | |
func() | |
pbar.update(1) | |
finish_run(name, start, size_label) | |
for func in stdlib_funcs: | |
name = f"{func.__name__}.to_thread" | |
start_run() | |
start = time.time() | |
for _ in range(ITERATIONS): | |
await asyncio.gather( | |
*[asyncio.to_thread(func) for _ in range(WORKERS)] | |
) | |
pbar.update(1) | |
finish_run(name, start, size_label) | |
del DATA | |
gc.collect() | |
if SORT_RESULTS: | |
RESULTS.sort(key=lambda x: (x[0], x[2])) # Sort by name then time | |
os.system("cls" if os.name == "nt" else "clear") | |
# Group results by function name | |
grouped_results = {} | |
for name, size, time_taken in RESULTS: | |
if name not in grouped_results: | |
grouped_results[name] = {"name": name} | |
grouped_results[name][size] = f"{time_taken:.2f}" | |
# Prepare table data | |
headers = ["Function"] + list(FILE_SIZES.keys()) | |
table_data = [] | |
for result in grouped_results.values(): | |
row = [result["name"]] | |
row.extend(result.get(size, "N/A") for size in FILE_SIZES.keys()) | |
table_data.append(row) | |
print(tabulate(table_data, headers=headers, tablefmt="github")) | |
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) | |
asyncio.run(run_benchmark()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For personal use only, it's not a serious attempt to benchmark