Created
November 22, 2019 03:26
-
-
Save chck/011b87b82ba378878e11973965644c28 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Multi-worker training with Estimator\n", | |
"- https://www.tensorflow.org/tutorials/distribute/multi_worker_with_estimator" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# !pip3 install tensorflow_datasets" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import tensorflow_datasets as tfds\n", | |
"import tensorflow as tf\n", | |
"tfds.disable_progress_bar()\n", | |
"\n", | |
"import os, json" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Input function" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"BUFFER_SIZE = 10000\n", | |
"BATCH_SIZE = 64\n", | |
"\n", | |
"def input_fn(mode, input_context=None):\n", | |
" datasets, info = tfds.load(\n", | |
" name='mnist',\n", | |
" with_info=True,\n", | |
" as_supervised=True)\n", | |
" mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else\n", | |
" datasets['test'])\n", | |
" \n", | |
" def scale(image, label):\n", | |
" image = tf.cast(image, tf.float32)\n", | |
" image /= 255\n", | |
" return image, label\n", | |
" \n", | |
" if input_context:\n", | |
" mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,\n", | |
" input_context.input_pipeline_id)\n", | |
" \n", | |
" return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Multi-worker configuration" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"os.environ['TF_CONFIG'] = json.dumps({\n", | |
" 'cluster': {\n", | |
" 'worker': ['localhost:12345', 'localhost:23456']\n", | |
" },\n", | |
" 'task': {'type': 'worker', 'index': 0}\n", | |
"})" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Define the model" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"LEARNING_RATE = 1e-4\n", | |
"layers = tf.keras.layers\n", | |
"\n", | |
"def model_fn(features, labels, mode):\n", | |
" model = tf.keras.Sequential([\n", | |
" layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),\n", | |
" layers.MaxPooling2D(),\n", | |
" layers.Flatten(),\n", | |
" layers.Dense(64, activation='relu'),\n", | |
" layers.Dense(10),\n", | |
" ])\n", | |
" logits = model(features, training=False)\n", | |
" \n", | |
" if mode == tf.estimator.ModeKeys.PREDICT:\n", | |
" predictions = {'logits': logits}\n", | |
" return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)\n", | |
" \n", | |
" optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate=LEARNING_RATE)\n", | |
" loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)\n", | |
" loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)\n", | |
" if mode == tf.estimator.ModeKeys.EVAL:\n", | |
" return tf.estimator.EstimatorSpec(mode, loss=loss)\n", | |
" \n", | |
" return tf.estimator.EstimatorSpec(\n", | |
" mode=mode,\n", | |
" loss=loss,\n", | |
" train_op=optimizer.minimize(loss, tf.compat.v1.train.get_or_create_global_step()))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## MultiWorkerMirroredStrategy" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:XLA_CPU:0']\n", | |
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n" | |
] | |
} | |
], | |
"source": [ | |
"strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Train and evaluate the model" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Initializing RunConfig with distribution strategies.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Initializing RunConfig with distribution strategies.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" }\n", | |
"}\n", | |
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe134567e80>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13ddf85f8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': 'independent_worker'}\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" }\n", | |
"}\n", | |
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe134567e80>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13ddf85f8>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': 'independent_worker'}\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Running `train_and_evaluate` with Distribute Coordinator.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Running `train_and_evaluate` with Distribute Coordinator.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, environment = None, rpc_layer = 'grpc'\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, environment = None, rpc_layer = 'grpc'\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Updated config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" }\n", | |
"}\n", | |
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe13457d588>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13457b400>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://localhost:12345', '_evaluation_master': 'grpc://localhost:12345', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Updated config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" }\n", | |
"}\n", | |
", '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7fe13457d588>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fe13457b400>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://localhost:12345', '_evaluation_master': 'grpc://localhost:12345', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Calling model_fn.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Calling model_fn.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_workers = 2\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_workers = 2\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Done calling model_fn.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Done calling model_fn.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 2\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 2\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:tensorflow:Collective ops may deadlock with `save_checkpoints_secs` please use `save_checkpoint_steps` instead. Clearing `save_checkpoint_secs` and setting `save_checkpoint_steps` to 1000 now.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:tensorflow:Collective ops may deadlock with `save_checkpoints_secs` please use `save_checkpoint_steps` instead. Clearing `save_checkpoint_secs` and setting `save_checkpoint_steps` to 1000 now.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Create CheckpointSaverHook.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Create CheckpointSaverHook.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x7fe13457bb00>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x7fe1340ddf60>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x7fe13457ba90>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x7fe13417a400>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x7fe134160940>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x7fe1341609e8>]\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x7fe13457bb00>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x7fe1340ddf60>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x7fe13457ba90>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x7fe13417a400>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x7fe134160940>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x7fe1341609e8>]\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Creating chief session creator with config: device_filters: \"/job:worker/task:0\"\n", | |
"allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" scoped_allocator_optimization: ON\n", | |
" scoped_allocator_opts {\n", | |
" enable_op: \"CollectiveReduce\"\n", | |
" }\n", | |
" }\n", | |
"}\n", | |
"experimental {\n", | |
" collective_group_leader: \"/job:worker/replica:0/task:0\"\n", | |
"}\n", | |
"\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Creating chief session creator with config: device_filters: \"/job:worker/task:0\"\n", | |
"allow_soft_placement: true\n", | |
"graph_options {\n", | |
" rewrite_options {\n", | |
" meta_optimizer_iterations: ONE\n", | |
" scoped_allocator_optimization: ON\n", | |
" scoped_allocator_opts {\n", | |
" enable_op: \"CollectiveReduce\"\n", | |
" }\n", | |
" }\n", | |
"}\n", | |
"experimental {\n", | |
" collective_group_leader: \"/job:worker/replica:0/task:0\"\n", | |
"}\n", | |
"\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Graph was finalized.\n" | |
] | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"INFO:tensorflow:Graph was finalized.\n" | |
] | |
} | |
], | |
"source": [ | |
"config = tf.estimator.RunConfig(train_distribute=strategy)\n", | |
"\n", | |
"classifier = tf.estimator.Estimator(model_fn=model_fn, model_dir='/tmp/multiworker', config=config)\n", | |
"tf.estimator.train_and_evaluate(\n", | |
" classifier,\n", | |
" train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n", | |
" eval_spec=tf.estimator.EvalSpec(input_fn=input_fn),\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.1" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment