Skip to content

Instantly share code, notes, and snippets.

@hayatoy
Last active August 27, 2018 19:24
Show Gist options
  • Save hayatoy/f6664f965a2519ec406e11235faf75b6 to your computer and use it in GitHub Desktop.
Save hayatoy/f6664f965a2519ec406e11235faf75b6 to your computer and use it in GitHub Desktop.
Dataflow Shuffle Example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dataflow Shuffle Example"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Change PROJECTID and BUCKET"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"PROJECTID = 'PROJECTID'\n",
"BUCKET = 'gs://BUCKET'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dataflow Shuffle service requires Dataflow SDK for Python version 2.1 or later. \n",
"if you run this notebook on Datalab, run the following code first.\n",
"```\n",
"!pip install google-cloud-dataflow --ignore-installed\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"\n",
"beam.__version__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dataflow options"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"options = beam.options.pipeline_options.PipelineOptions()\n",
"gcloud_options = options.view_as(\n",
" beam.options.pipeline_options.GoogleCloudOptions)\n",
"gcloud_options.job_name = 'dataflow-shuffle-test'\n",
"gcloud_options.project = PROJECTID\n",
"gcloud_options.staging_location = BUCKET + '/staging'\n",
"gcloud_options.temp_location = BUCKET + '/temp'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is **important** if you want to activate Dataflow Shuffle service. \n",
"Set `'shuffle_mode=service'` as **list** to `experiments`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"debug_options = options.view_as(beam.options.pipeline_options.DebugOptions)\n",
"debug_options.experiments = ['shuffle_mode=service']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Change worker options if needed. \n",
"Note that Dataflow Shuffle service is available only in `us-central1` and `europe-west1`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)\n",
"# worker_options.disk_size_gb = 20\n",
"# worker_options.max_num_workers = 10\n",
"# worker_options.machine_type = 'n1-standard-8'\n",
"# worker_options.zone = 'asia-northeast1-a'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Choose a runner."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'\n",
"options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally run a pipeline. \n",
"This is a simple example to count the number of names of newborns in the US. \n",
"Dataflow Shuffle service takes the GroupByKey operation. \n",
"(Don't say that we can replace this pipeline into SQL.. :p )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def sum_of_grouped_count(kvpair):\n",
" return {'name': kvpair[0],\n",
" 'sum': sum(kvpair[1])\n",
" }\n",
"\n",
"p = beam.Pipeline(options=options)\n",
"\n",
"query = 'SELECT * FROM [bigquery-public-data:usa_names.usa_1910_current]'\n",
"(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,\n",
" use_standard_sql=False,\n",
" query=query))\n",
" | 'pair' >> beam.Map(lambda x: (x['name'], x['number']))\n",
" | \"groupby\" >> beam.GroupByKey()\n",
" | 'count' >> beam.Map(sum_of_grouped_count)\n",
" | 'write' >> beam.io.WriteToText(BUCKET + '/test.txt', num_shards=1)\n",
" )\n",
"\n",
"p.run() #.wait_until_finish()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment