Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rrmerugu/3b6028b004bb3a0e7b1d1ec305b07ec5 to your computer and use it in GitHub Desktop.
Save rrmerugu/3b6028b004bb3a0e7b1d1ec305b07ec5 to your computer and use it in GitHub Desktop.
Dataflow Shuffle Example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dataflow Shuffle Example"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Change PROJECTID and BUCKET"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"PROJECTID = 'PROJECTID'\n",
"BUCKET = 'gs://BUCKET'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dataflow Shuffle service requires Dataflow SDK for Python version 2.1 or later. \n",
"if you run this notebook on Datalab, run the following code first.\n",
"```\n",
"!pip install google-cloud-dataflow --ignore-installed\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"\n",
"beam.__version__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dataflow options"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"options = beam.options.pipeline_options.PipelineOptions()\n",
"gcloud_options = options.view_as(\n",
" beam.options.pipeline_options.GoogleCloudOptions)\n",
"gcloud_options.job_name = 'dataflow-shuffle-test'\n",
"gcloud_options.project = PROJECTID\n",
"gcloud_options.staging_location = BUCKET + '/staging'\n",
"gcloud_options.temp_location = BUCKET + '/temp'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is **important** if you want to activate Dataflow Shuffle service. \n",
"Set `'shuffle_mode=service'` as **list** to `experiments`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"debug_options = options.view_as(beam.options.pipeline_options.DebugOptions)\n",
"debug_options.experiments = ['shuffle_mode=service']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Change worker options if needed. \n",
"Note that Dataflow Shuffle service is available only in `us-central1` and `europe-west1`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)\n",
"# worker_options.disk_size_gb = 20\n",
"# worker_options.max_num_workers = 10\n",
"# worker_options.machine_type = 'n1-standard-8'\n",
"# worker_options.zone = 'asia-northeast1-a'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Choose a runner."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'\n",
"options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally run a pipeline. \n",
"This is a simple example to count the number of names of newborns in the US. \n",
"Dataflow Shuffle service takes the GroupByKey operation. \n",
"(Don't say that we can replace this pipeline into SQL.. :p )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def sum_of_grouped_count(kvpair):\n",
" return {'name': kvpair[0],\n",
" 'sum': sum(kvpair[1])\n",
" }\n",
"\n",
"p = beam.Pipeline(options=options)\n",
"\n",
"query = 'SELECT * FROM [bigquery-public-data:usa_names.usa_1910_current]'\n",
"(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,\n",
" use_standard_sql=False,\n",
" query=query))\n",
" | 'pair' >> beam.Map(lambda x: (x['name'], x['number']))\n",
" | \"groupby\" >> beam.GroupByKey()\n",
" | 'count' >> beam.Map(sum_of_grouped_count)\n",
" | 'write' >> beam.io.WriteToText(BUCKET + '/test.txt', num_shards=1)\n",
" )\n",
"\n",
"p.run() #.wait_until_finish()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment