Skip to content

Instantly share code, notes, and snippets.

@hayatoy
Last active March 7, 2017 14:06
Show Gist options
  • Save hayatoy/fa742c7ca2f32cdb33b4a43a5a2c10a8 to your computer and use it in GitHub Desktop.
Save hayatoy/fa742c7ca2f32cdb33b4a43a5a2c10a8 to your computer and use it in GitHub Desktop.
Training Multiple Models of TensorFlow using Dataflow
import apache_beam as beam
import apache_beam.transforms.window as window
options = beam.utils.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{PROJECTID}'
google_cloud_options.job_name = 'tensorflow-gs'
google_cloud_options.staging_location = 'gs://{BUCKET_NAME}/binaries'
google_cloud_options.temp_location = 'gs://{BUCKET_NAME}/temp'
worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 6
worker_options.num_workers = 6
worker_options.disk_size_gb = 20
# worker_options.machine_type = 'n1-standard-16'
# options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
import itertools
param_grid = {'hidden_units': [[10, 20, 10], [20, 40, 20], [100, 200, 100]],
'dropout': [0.1, 0.2, 0.5, 0.8],
'steps': [20000, 50000, 100000]}
def dict_product(param):
return (dict(itertools.izip(param, x)) for x in itertools.product(*param.itervalues()))
params = list(dict_product(param_grid))
def train(param):
import uuid
import json
import tensorflow as tf
from sklearn import cross_validation
model_id = str(uuid.uuid4())
# Load iris dataset
iris = tf.contrib.learn.datasets.base.load_iris()
train_x, test_x, train_y, test_y = cross_validation.train_test_split(
iris.data, iris.target, test_size=0.2, random_state=0
)
# https://www.tensorflow.org/get_started/tflearn
feature_columns = [tf.contrib.layers.real_valued_column("", dimension=4)]
classifier = tf.contrib.learn.DNNClassifier(feature_columns=feature_columns,
hidden_units=param['hidden_units'],
dropout=param['dropout'],
n_classes=3,
model_dir='gs://{BUCKET_NAME}/models/%s'% model_id)
classifier.fit(x=train_x,
y=train_y,
steps=param['steps'],
batch_size=50)
result = classifier.evaluate(x=test_x, y=test_y)
ret = {'accuracy': float(result['accuracy']),
'loss': float(result['loss']),
'model_id': model_id,
'param': json.dumps(param)}
return ret
(p | 'init' >> beam.Create(params)
| 'train' >> beam.Map(train)
| 'output' >> beam.Write(beam.io.BigQuerySink('project:dataset.table',
schema="accuracy:FLOAT, loss:FLOAT, model_id:STRING, param:STRING",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
)
p.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment