Skip to content

Instantly share code, notes, and snippets.

@monocongo
Created December 18, 2019 15:37
Show Gist options
  • Save monocongo/0aa4ca95f63dc5627b80a36793fbb64c to your computer and use it in GitHub Desktop.
Save monocongo/0aa4ca95f63dc5627b80a36793fbb64c to your computer and use it in GitHub Desktop.
Example of using ray for parallization, not working as expected (10x slower than comparable loop implementation)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import psutil\n",
"import ray"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Initialize ray"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2019-12-18 10:25:30,151\tWARNING services.py:597 -- setpgrp failed, processes may not be cleaned up properly: [Errno 1] Operation not permitted.\n",
"2019-12-18 10:25:30,152\tINFO resource_spec.py:216 -- Starting Ray with 13.92 GiB memory available for workers and up to 6.98 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).\n"
]
},
{
"data": {
"text/plain": [
"{'node_ip_address': '192.168.86.189',\n",
" 'redis_address': '192.168.86.189:25030',\n",
" 'object_store_address': '/tmp/ray/session_2019-12-18_10-25-30_151049_20193/sockets/plasma_store',\n",
" 'raylet_socket_name': '/tmp/ray/session_2019-12-18_10-25-30_151049_20193/sockets/raylet',\n",
" 'webui_url': None,\n",
" 'session_dir': '/tmp/ray/session_2019-12-18_10-25-30_151049_20193'}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"num_cpus = psutil.cpu_count(logical=False)\n",
"ray.init(num_cpus=num_cpus, ignore_reinit_error=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define functions that will perform a simple computation on a slice of an array, to be run serially"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def add_average(\n",
" in_ary: np.ndarray,\n",
" out_ary: np.ndarray,\n",
" x_index: int,\n",
" y_index: int,\n",
"):\n",
" \"\"\"\n",
" Adds the mean of an array slice to each element of the slice and assigns this slice into an output array.\n",
" \"\"\"\n",
" \n",
" ary = in_ary[x_index, y_index]\n",
" out_ary[x_index, y_index] = ary + np.mean(ary)\n",
" \n",
" \n",
"def compute_with_loop(\n",
" input_array: np.ndarray,\n",
") -> np.ndarray:\n",
" \"\"\"\n",
" Applies the add_average() function to each 1-D slice of a 3-D array and returns the result. \n",
" \"\"\"\n",
" output_array = np.full(shape=input_array.shape, fill_value=np.NaN)\n",
" for x in range(input_array.shape[0]):\n",
" for y in range(input_array.shape[1]):\n",
" add_average(input_array, output_array, x, y)\n",
" return output_array"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Define functions that will perform a simple computation on a slice of an array, to be run in parallel using ray"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"@ray.remote\n",
"def add_average_ray(\n",
" in_ary: np.ndarray,\n",
" x_index: int,\n",
" y_index: int,\n",
"):\n",
" \"\"\"\n",
" Adds the mean of an array slice to each element of the slice and returns the resulting \n",
" 1-D array slice along with the slice's indices.\n",
" \"\"\"\n",
"\n",
" ary = in_ary[x_index, y_index]\n",
" return (ary + np.mean(ary), x_index, y_index)\n",
"\n",
"\n",
"def compute_with_ray(\n",
" input_array: np.ndarray,\n",
") -> np.ndarray:\n",
" \"\"\"\n",
" Applies the add_average_ray() function to each 1-D slice of a 3-D array and returns the result. \n",
" \"\"\"\n",
" output_array = np.full(shape=input_array.shape, fill_value=np.NaN)\n",
" in_array_id = ray.put(input_array)\n",
" futures = []\n",
" for x in range(input_array.shape[0]):\n",
" for y in range(input_array.shape[1]):\n",
" futures.append(add_average_ray.remote(in_array_id, x, y))\n",
"\n",
" results = ray.get(futures)\n",
" for result in results:\n",
" slice_array = result[0]\n",
" x_index = result[1]\n",
" y_index = result[2]\n",
" output_array[x_index, y_index] = slice_array\n",
" \n",
" return output_array"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create an input array"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"input_array = np.ones(shape=(400, 400, 1200))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run the serial processing and report the execution time"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.74 s, sys: 68.1 ms, total: 1.81 s\n",
"Wall time: 1.77 s\n"
]
}
],
"source": [
"%%time\n",
"average_added_loop = compute_with_loop(input_array)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Run the parallel processing and report the execution time"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 21.3 s, sys: 4.57 s, total: 25.9 s\n",
"Wall time: 18.2 s\n"
]
}
],
"source": [
"%%time\n",
"average_added_ray = compute_with_ray(input_array)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Verify that the two codes produce the same result"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"np.allclose(average_added_loop, average_added_ray)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment