Skip to content

Instantly share code, notes, and snippets.

@jakirkham
Last active February 21, 2020 00:07
Show Gist options
  • Save jakirkham/7ec3b48721c47f60bea06ee38b78de01 to your computer and use it in GitHub Desktop.
Save jakirkham/7ec3b48721c47f60bea06ee38b78de01 to your computer and use it in GitHub Desktop.
dask-cudf_multi_partition_merge
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_cudf\n",
"import cudf\n",
"import cupy\n",
"import os\n",
"import time\n",
"import dask\n",
"import dask.dataframe as dd\n",
"import dask.array as da\n",
"from dask.distributed import performance_report\n",
"\n",
"import rmm\n",
"\n",
"from dask_cuda import LocalCUDACluster\n",
"from dask.distributed import Client,wait\n",
"\n",
"\n",
"def create_random_data(n_rows=1_000,\n",
" n_parts=10,\n",
" n_keys_index_1=100_000,\n",
" n_keys_index_2=100,\n",
" n_keys_index_3=100,\n",
" col_prefix = 'a'):\n",
"\n",
" chunks = n_rows // n_parts\n",
"\n",
" df = dd.concat([\n",
" da.random.random(n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_non_merge_1')),\n",
" da.random.random(n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_non_merge_2')),\n",
" da.random.random(n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_non_merge_3')),\n",
" da.random.randint(0, n_keys_index_1, size=n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_0')),\n",
" da.random.randint(0, n_keys_index_2, size=n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_1')),\n",
" da.random.randint(0, n_keys_index_3, size=n_rows, chunks=chunks).to_dask_dataframe(columns=(col_prefix + '_2')),\n",
" ], axis=1).persist()\n",
"\n",
" gdf = df.map_partitions(cudf.from_pandas)\n",
" gdf = gdf.persist()\n",
" _ = wait(gdf)\n",
" return gdf"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dask.config.set({\n",
" \"distributed.dashboard.link\": \"/proxy/{port}/status\"\n",
"})\n",
"cluster = LocalCUDACluster(\n",
" local_directory=\"/datasets/jkirkham\",\n",
" rmm_pool_size=\"28GB\",\n",
" protocol=\"ucx\",\n",
" enable_nvlink=True,\n",
")\n",
"client = Client(cluster)\n",
"client.run(cupy.cuda.set_allocator, rmm.rmm_cupy_allocator);"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rows_1, parts_1 = 140_176_770, 245\n",
"rows_2, parts_2 = 21_004_393, 171\n",
"\n",
"with performance_report(filename=\"dask-report.html\"):\n",
" df_1 = create_random_data(n_rows=rows_1, n_parts=parts_1, col_prefix='a')\n",
" df_2 = create_random_data(n_rows=rows_2, n_parts=parts_2, col_prefix='b')\n",
"\n",
" merged_df = df_1.merge(df_2,\n",
" left_on=['a_0', 'a_1', 'a_2'],\n",
" right_on=['b_0', 'b_1', 'b_2'])\n",
" %time len(merged_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}
This file has been truncated, but you can view the full file.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Dask Performance Report</title>
<script type="text/javascript" src="https://cdn.pydata.org/bokeh/release/bokeh-1.4.0.min.js"></script>
<script type="text/javascript" src="https://cdn.pydata.org/bokeh/release/bokeh-widgets-1.4.0.min.js"></script>
<script type="text/javascript">
Bokeh.set_log_level("info");
</script>
</head>
<body>
<div class="bk-root" id="28d486b0-f130-414f-b5c4-41207af45c08" data-root-id="11327"></div>
<script type="application/json" id="11944">
View raw

(Sorry about that, but we can’t show files that are this big right now.)

View raw

(Sorry about that, but we can’t show files that are this big right now.)

View raw

(Sorry about that, but we can’t show files that are this big right now.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment