Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save a3agalyan/969cdcdd62c9723cb2442da12417c4e8 to your computer and use it in GitHub Desktop.
Save a3agalyan/969cdcdd62c9723cb2442da12417c4e8 to your computer and use it in GitHub Desktop.
example for "Prediction at Scale with scikit-learn and PySpark Pandas UDFs" (https://medium.com/civis-analytics/prediction-at-scale-with-scikit-learn-and-pyspark-pandas-udfs-51d5ebfb2cd8)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set things up and generate some data"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from sklearn.datasets import make_classification\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.ensemble import RandomForestClassifier\n",
"from sklearn.model_selection import GridSearchCV\n",
"import pandas as pd\n",
"import pyspark\n",
"import pyspark.sql.functions as F\n",
"from pyspark.sql.types import DoubleType, StringType, ArrayType"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Make some fake data and train a model.\n",
"n_samples_test = 100000\n",
"n_samples_train = 1000\n",
"n_samples_all = n_samples_train + n_samples_test\n",
"n_features = 50\n",
"\n",
"X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123)\n",
"X_train, X_test, y_train, y_test = \\\n",
" train_test_split(X, y, test_size=n_samples_test, random_state=45)\n",
"\n",
"# Use pandas to put the test data in parquet format to illustrate how to load it up later.\n",
"# In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc.\n",
"column_names = [f'feature{i}' for i in range(n_features)]\n",
"(\n",
" pd.DataFrame(X_test, columns=column_names)\n",
" .reset_index()\n",
" .rename(columns={'index': 'id'})\n",
" .to_parquet('unlabeled_data')\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train a model with scikit-learn"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ROC AUC: 0.959\n"
]
}
],
"source": [
"param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}\n",
"gs_rf = GridSearchCV(\n",
" RandomForestClassifier(random_state=42),\n",
" param_grid=param_grid,\n",
" scoring='roc_auc'\n",
").fit(X_train, y_train)\n",
"print('ROC AUC: %.3f' % gs_rf.best_score_)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up a spark environment"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"sc = pyspark.SparkContext(appName=\"foo\")\n",
"sqlContext = pyspark.SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Now load the data and make predictions.\n",
"\n",
"In real usage, we might be doing a bunch of ETL after reading raw data, but here, we'll just load it up."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: bigint, feature0: double, feature1: double, feature2: double, feature3: double, feature4: double, feature5: double, feature6: double, feature7: double, feature8: double, feature9: double, feature10: double, feature11: double, feature12: double, feature13: double, feature14: double, feature15: double, feature16: double, feature17: double, feature18: double, feature19: double, feature20: double, feature21: double, feature22: double, feature23: double, feature24: double, feature25: double, feature26: double, feature27: double, feature28: double, feature29: double, feature30: double, feature31: double, feature32: double, feature33: double, feature34: double, feature35: double, feature36: double, feature37: double, feature38: double, feature39: double, feature40: double, feature41: double, feature42: double, feature43: double, feature44: double, feature45: double, feature46: double, feature47: double, feature48: double, feature49: double, __index_level_0__: bigint]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_unlabeled = sqlContext.read.parquet('unlabeled_data')\n",
"df_unlabeled"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions with a regular UDF\n",
"\n",
"First, we'll try a regular UDF. This will deserialize one row (i.e., instance, sample, record) at a time, make a prediction with the, and return a prediction, which will be serialized and sent back to Spark to combine with all the other predictions."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction=0.96),\n",
" Row(id=1, prediction=0.13),\n",
" Row(id=2, prediction=0.95),\n",
" Row(id=3, prediction=0.43),\n",
" Row(id=4, prediction=0.95)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.udf(returnType=DoubleType())\n",
"def predict_udf(*cols):\n",
" # cols will be a tuple of floats here.\n",
" return float(gs_rf.predict_proba((cols,))[0, 1])\n",
"\n",
"df_pred_a = df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_udf(*column_names).alias('prediction')\n",
")\n",
"df_pred_a.take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions with a Pandas UDF\n",
"\n",
"Now we'll use a Pandas UDF (i.e., vectorized UDF). In this case, Spark will send a tuple of pandas Series objects with multiple rows at a time. The tuple will have one Series per column/feature, in the order they are passed to the UDF. Note that one of these Series objects won't contain features for all rows at once because Spark partitions datasets across workers. The partition size can be tuned, but we'll just use defaults here."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction=0.96),\n",
" Row(id=1, prediction=0.13),\n",
" Row(id=2, prediction=0.95),\n",
" Row(id=3, prediction=0.43),\n",
" Row(id=4, prediction=0.95)]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.pandas_udf(returnType=DoubleType())\n",
"def predict_pandas_udf(*cols):\n",
" # cols will be a tuple of pandas.Series here.\n",
" X = pd.concat(cols, axis=1)\n",
" return pd.Series(gs_rf.predict_proba(X)[:, 1])\n",
"\n",
"df_pred_b = df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_pandas_udf(*column_names).alias('prediction')\n",
")\n",
"df_pred_b.take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Making multiclass predictions\n",
"\n",
"Above, we're just returning a single series of predictions for the positive class, which works for single binary or dependent variables. One can also put multiclass or multilabel models in Pandas UDFs. One just returns a series of lists of numbers instead of a series of numbers."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction_0=0.04, prediction_1=0.96),\n",
" Row(id=1, prediction_0=0.87, prediction_1=0.13),\n",
" Row(id=2, prediction_0=0.05, prediction_1=0.95),\n",
" Row(id=3, prediction_0=0.57, prediction_1=0.43),\n",
" Row(id=4, prediction_0=0.05, prediction_1=0.95)]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.pandas_udf(returnType=ArrayType(DoubleType()))\n",
"def predict_pandas_udf(*cols):\n",
" X = pd.concat(cols, axis=1)\n",
" return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))\n",
"\n",
"df_pred_multi = (\n",
" df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_pandas_udf(*column_names).alias('predictions')\n",
" )\n",
" # Select each item of the prediction array into its own column.\n",
" .select(\n",
" F.col('id'),\n",
" *[F.col('predictions')[i].alias(f'prediction_{c}')\n",
" for i, c in enumerate(gs_rf.classes_)]\n",
" )\n",
")\n",
"df_pred_multi.take(5)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment