Skip to content

Instantly share code, notes, and snippets.

@VibhuJawa
Last active April 20, 2023 18:31
Show Gist options
  • Save VibhuJawa/acc3ecd38186e9be0fdf54ad0a3bea83 to your computer and use it in GitHub Desktop.
Save VibhuJawa/acc3ecd38186e9be0fdf54ad0a3bea83 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "31ef013c",
"metadata": {},
"source": [
"# Peak Memory Allocations with Dask\n",
"This notebook showcases how to use the dask-cuda worker to find the peak memory usage of a function\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "9f52bc0d",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import cudf\n",
"import time\n",
"from dask_cuda import LocalCUDACluster\n",
"from dask.distributed import Client\n",
"from dask.distributed import wait, default_client\n",
"import dask \n",
"import dask_cudf\n",
"from dask.base import is_dask_collection\n",
"from pprint import pprint"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "3fcf080f",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-04-20 11:22:10,332 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space-10123/worker-ljqoh3cr', purging\n",
"2023-04-20 11:22:10,332 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n",
"2023-04-20 11:22:10,332 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n",
"2023-04-20 11:22:10,332 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space-10123/worker-8ar2wlsw', purging\n",
"2023-04-20 11:22:10,332 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space-10123/worker-af4mk6_2', purging\n",
"2023-04-20 11:22:10,333 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space-10123/worker-xv1_lbg2', purging\n",
"2023-04-20 11:22:10,333 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n",
"2023-04-20 11:22:10,333 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n",
"2023-04-20 11:22:10,334 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n",
"2023-04-20 11:22:10,334 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n",
"2023-04-20 11:22:10,334 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize\n",
"2023-04-20 11:22:10,335 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize\n"
]
}
],
"source": [
"# Create a dask_cuda cluster\n",
"cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES='4,5,6,7', rmm_pool_size='10GB')\n",
"client = Client(cluster)"
]
},
{
"cell_type": "markdown",
"id": "be8ad7bd",
"metadata": {},
"source": [
"# Get Memory Stats \n",
"Below code gets us the memory statistics using a decorator"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "094e4d9c",
"metadata": {},
"outputs": [],
"source": [
"def set_statistics_adaptor():\n",
" \"\"\"\n",
" Sets the current device resource to a StatisticsResourceAdaptor\n",
" \"\"\"\n",
" import rmm\n",
" rmm.mr.set_current_device_resource(rmm.mr.StatisticsResourceAdaptor(rmm.mr.get_current_device_resource()))\n",
" \n",
"def _get_allocation_counts():\n",
" \"\"\"\n",
" Returns the allocation counts from the current device resource\n",
" \"\"\"\n",
" import rmm\n",
" mr = rmm.mr.get_current_device_resource()\n",
" if not hasattr(mr, \"allocation_counts\"):\n",
" if hasattr(mr, \"upstream_mr\"):\n",
" return _get_allocation_counts(mr.upstream_mr)\n",
" else:\n",
" return -1\n",
" else:\n",
" return mr.allocation_counts\n",
"\n",
"def persist_dask_object(arg):\n",
" \"\"\"\n",
" Persist if it is a dask object\n",
" \"\"\"\n",
" if is_dask_collection(arg) or hasattr(arg, 'persist'):\n",
" arg = dask.persist(arg)\n",
" wait(arg)\n",
" arg = arg[0]\n",
" return arg\n",
"\n",
"# Function to convert bytes into human readable format\n",
"def sizeof_fmt(num, suffix='B'):\n",
" for unit in ['','K','M','G','T','P','E','Z']:\n",
" if abs(num) < 1024.0:\n",
" return \"%3.1f%s%s\" % (num, unit, suffix)\n",
" num /= 1024.0\n",
" return \"%.1f%s%s\" % (num, 'Yi', suffix)\n",
"\n",
"\n",
"def _parse_allocation_counts(allocation_counts):\n",
" \"\"\"\n",
" Parses the allocation counts from the current device resource\n",
" into human readable format\n",
" \"\"\"\n",
" return {k:sizeof_fmt(v) for k, v in allocation_counts.items() if 'bytes' in k}\n",
"\n",
"\n",
"# Create a decorator to set the statistics adaptor\n",
"# and calls the allocation_counts function \n",
"def get_allocation_counts_dask_lazy(func):\n",
" def wrapper(*args, **kwargs):\n",
" ## Todo: Expand to local case\n",
" client=default_client()\n",
" client.run(set_statistics_adaptor)\n",
" st = time.time()\n",
" return_val = func(*args, **kwargs)\n",
" et = time.time()\n",
" allocation_counts=client.run(_get_allocation_counts)\n",
" allocation_counts={worker_id: _parse_allocation_counts(worker_allocations)\n",
" for worker_id,worker_allocations in allocation_counts.items()}\n",
" print(f\"function: {func.__name__}\\n\")\n",
" print(f\"function args: {args} kwargs: {kwargs}\", flush=True)\n",
" print(f\"execution_time: {et-st}\", flush=True)\n",
" print(\"allocation_counts\")\n",
" pprint(allocation_counts, indent=4, width=1, compact=True)\n",
" client.run(set_statistics_adaptor)\n",
" return return_val\n",
" return wrapper \n",
"\n",
"def get_allocation_counts_local(func):\n",
" def wrapper(*args, **kwargs):\n",
" set_statistics_adaptor()\n",
" st = time.time( )\n",
" return_val = func(*args, **kwargs)\n",
" et = time.time()\n",
" allocation_counts=_get_allocation_counts()\n",
" allocation_counts=_parse_allocation_counts(allocation_counts)\n",
" print(f\"function: {func.__name__}\\n\")\n",
" print(f\"function args: {args} kwargs: {kwargs}\", flush=True)\n",
" print(f\"execution_time: {et-st}\", flush=True)\n",
" print(\"allocation_counts\")\n",
" pprint(allocation_counts, indent=4, width=1, compact=True)\n",
" set_statistics_adaptor()\n",
" return return_val\n",
" return wrapper\n",
"\n",
"\n",
"def get_allocation_counts_dask_persist(func):\n",
" def wrapper(*args, **kwargs):\n",
" args = [persist_dask_object(a) for a in args]\n",
" kwargs = {k:persist_dask_object(v) for k,v in kwargs.items()}\n",
" client = default_client()\n",
" client.run(set_statistics_adaptor)\n",
" st = time.time()\n",
" return_val = func(*args, **kwargs)\n",
" return_val = persist_dask_object(return_val)\n",
" if isinstance(return_val, (list, tuple)):\n",
" return_val = [persist_dask_object(d) for d in return_val]\n",
" et = time.time()\n",
" allocation_counts=client.run(_get_allocation_counts)\n",
" allocation_counts={worker_id: _parse_allocation_counts(worker_allocations)\n",
" for worker_id,worker_allocations in allocation_counts.items()}\n",
" print(f\"function: {func.__name__}\\n\")\n",
" print(f\"function args: {args} kwargs: {kwargs}\", flush=True)\n",
" print(f\"execution_time: {et-st}\", flush=True)\n",
" print(\"allocation_counts\")\n",
" pprint(allocation_counts, indent=4, width=1, compact=True)\n",
" client.run(set_statistics_adaptor)\n",
" return return_val\n",
" return wrapper\n",
"\n",
" "
]
},
{
"cell_type": "markdown",
"id": "4cc63322",
"metadata": {},
"source": [
"## Example showing decorator"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "14d5965e",
"metadata": {},
"outputs": [],
"source": [
"@get_allocation_counts_dask_lazy\n",
"def create_timeseries_with_persist(start='2000-01-01', end='2000-01-31'):\n",
" '''\n",
" Create a timeseries dataset from start to end\n",
" and persist it\n",
" '''\n",
" df = dask.datasets.timeseries(start=start, end=end)\n",
" df = dask_cudf.from_dask_dataframe(df)\n",
" df['id']=df['id']*100\n",
" # Have to persist and wait\n",
" # to force the computation\n",
" # and get the allocation counts\n",
" df = df.persist()\n",
" wait(df)\n",
" return df\n",
"\n",
"\n",
"@get_allocation_counts_dask_persist\n",
"def create_timeseries_without_persist(start='2000-01-01', end='2000-01-31'):\n",
" '''\n",
" Create a timeseries dataset from start to end\n",
" and persist it\n",
" '''\n",
" df = dask.datasets.timeseries(start=start, end=end)\n",
" df = dask_cudf.from_dask_dataframe(df)\n",
" df['id']=df['id']*100\n",
" return df"
]
},
{
"cell_type": "markdown",
"id": "7c799e5a",
"metadata": {},
"source": [
"##### 1 Month"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "276a77d6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"function: create_timeseries_with_persist\n",
"\n",
"function args: () kwargs: {'start': '2000-01-01', 'end': '2000-01-31'}\n",
"execution_time: 5.768169403076172\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '20.4MB',\n",
" 'peak_bytes': '23.1MB',\n",
" 'total_bytes': '36.3MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '23.8MB',\n",
" 'peak_bytes': '26.5MB',\n",
" 'total_bytes': '42.3MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '40.9MB',\n",
" 'peak_bytes': '43.5MB',\n",
" 'total_bytes': '72.5MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '17.0MB',\n",
" 'peak_bytes': '19.7MB',\n",
" 'total_bytes': '30.2MB'}}\n",
"function: create_timeseries_without_persist\n",
"\n",
"function args: [] kwargs: {'start': '2000-01-01', 'end': '2000-01-31'}\n",
"execution_time: 0.2883937358856201\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '23.8MB',\n",
" 'peak_bytes': '26.5MB',\n",
" 'total_bytes': '42.3MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '27.3MB',\n",
" 'peak_bytes': '29.9MB',\n",
" 'total_bytes': '48.3MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '27.3MB',\n",
" 'peak_bytes': '29.9MB',\n",
" 'total_bytes': '48.3MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '23.8MB',\n",
" 'peak_bytes': '26.5MB',\n",
" 'total_bytes': '42.3MB'}}\n"
]
}
],
"source": [
"df = create_timeseries_with_persist(start='2000-01-01', end='2000-01-31')\n",
"df = create_timeseries_without_persist(start='2000-01-01', end='2000-01-31')"
]
},
{
"cell_type": "markdown",
"id": "08379bcb",
"metadata": {},
"source": [
"##### 10 Months"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "2d94af1c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"function: create_timeseries_with_persist\n",
"\n",
"function args: () kwargs: {'start': '2000-01-01', 'end': '2000-11-01'}\n",
"execution_time: 2.5816571712493896\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '258.9MB',\n",
" 'peak_bytes': '261.6MB',\n",
" 'total_bytes': '459.3MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '258.9MB',\n",
" 'peak_bytes': '261.6MB',\n",
" 'total_bytes': '459.3MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '258.9MB',\n",
" 'peak_bytes': '261.6MB',\n",
" 'total_bytes': '459.3MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '262.3MB',\n",
" 'peak_bytes': '265.0MB',\n",
" 'total_bytes': '465.3MB'}}\n",
"function: create_timeseries_without_persist\n",
"\n",
"function args: [] kwargs: {'start': '2000-01-01', 'end': '2000-11-01'}\n",
"execution_time: 2.6825990676879883\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '265.7MB',\n",
" 'peak_bytes': '268.4MB',\n",
" 'total_bytes': '471.4MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '258.9MB',\n",
" 'peak_bytes': '261.6MB',\n",
" 'total_bytes': '459.3MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '248.7MB',\n",
" 'peak_bytes': '251.3MB',\n",
" 'total_bytes': '441.2MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '265.7MB',\n",
" 'peak_bytes': '268.4MB',\n",
" 'total_bytes': '471.4MB'}}\n"
]
}
],
"source": [
"df = create_timeseries_with_persist(start='2000-01-01', end='2000-11-01')\n",
"df = create_timeseries_without_persist(start='2000-01-01', end='2000-11-01')"
]
},
{
"cell_type": "markdown",
"id": "e0a59ca8",
"metadata": {},
"source": [
"##### 15 days"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "331fc573",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"function: create_timeseries_with_persist\n",
"\n",
"function args: () kwargs: {'start': '2000-01-01', 'end': '2000-01-16'}\n",
"execution_time: 0.27789306640625\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '10.2MB',\n",
" 'peak_bytes': '12.9MB',\n",
" 'total_bytes': '18.1MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'}}\n",
"function: create_timeseries_without_persist\n",
"\n",
"function args: [] kwargs: {'start': '2000-01-01', 'end': '2000-01-16'}\n",
"execution_time: 0.16781949996948242\n",
"allocation_counts\n",
"{ 'tcp://127.0.0.1:38805': { 'current_bytes': '10.2MB',\n",
" 'peak_bytes': '12.9MB',\n",
" 'total_bytes': '18.1MB'},\n",
" 'tcp://127.0.0.1:40743': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'},\n",
" 'tcp://127.0.0.1:42893': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'},\n",
" 'tcp://127.0.0.1:45159': { 'current_bytes': '13.6MB',\n",
" 'peak_bytes': '16.3MB',\n",
" 'total_bytes': '24.2MB'}}\n"
]
}
],
"source": [
"df = create_timeseries_with_persist(start='2000-01-01', end='2000-01-16')\n",
"\n",
"df = create_timeseries_without_persist(start='2000-01-01', end='2000-01-16')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10.9 ('cudf-ucx-23.04-march-14')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
},
"vscode": {
"interpreter": {
"hash": "64961b937fb34456916b4ee9fe53025e0ff0d0365c0e31beae471aff85d7efc8"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment