Skip to content

Instantly share code, notes, and snippets.

@arg0
Last active September 5, 2016 07:47
Show Gist options
  • Save arg0/1e4d7c1ca094f1d357ece0d95d7177f6 to your computer and use it in GitHub Desktop.
Save arg0/1e4d7c1ca094f1d357ece0d95d7177f6 to your computer and use it in GitHub Desktop.
class PartitionedVariablesBenchmark(tf.test.Benchmark):
def benchmark_create_1000_partitions_with_100_parameter_servers(self):
workers, _ = create_local_cluster(num_workers=1, num_ps=100)
worker_sessions = [tf.Session(w.target) for w in workers]
worker = worker_sessions[0]
partition_sizes = (1, 512, 1024*32, 1024*128)
partitioned = []
for partition_size in partition_sizes:
# max_shard_bytes is 4, shape is 1000*partition_size float32s which should
# partition into 1000 shards, each containing partition_size float32s.
print("Building partitioned variable with %d floats per partition"
% partition_size)
with tf.device(tf.train.replica_device_setter(ps_tasks=100)):
partitioned_ix = tf.get_variable(
"partitioned_%d" % partition_size,
shape=[1000 * partition_size],
dtype=tf.float32,
# Each partition to have exactly N float32s
partitioner=tf.variable_axis_size_partitioner(
max_shard_bytes=4 * partition_size))
# Concatenates along axis 0
partitioned.append(tf.convert_to_tensor(partitioned_ix))
tf.initialize_all_variables().run(session=worker)
for ix, partition_size in enumerate(partition_sizes):
print("Running benchmark having partitions with %d floats"
% partition_size)
self.run_op_benchmark(
worker,
partitioned[ix],
name=("read_concat_1000_partitions_from_"
"100_parameter_servers_partsize_%d_floats" % partition_size))
###################################################################################
def testMultivalentCrossUsageInPredictionsWithPartition(self):
# bucket size has to be big enough to allwo sharding.
language = tf.contrib.layers.sparse_column_with_hash_bucket(
"language", hash_bucket_size=64 << 19)
country = tf.contrib.layers.sparse_column_with_hash_bucket(
"country", hash_bucket_size=64 << 18)
country_language = tf.contrib.layers.crossed_column(
[language, country], hash_bucket_size=64 << 18)
with tf.Graph().as_default():
features = {
"language": tf.SparseTensor(values=["english", "spanish"],
indices=[[0, 0], [0, 1]],
shape=[1, 2]),
"country": tf.SparseTensor(values=["US", "SV"],
indices=[[0, 0], [0, 1]],
shape=[1, 2])
}
with tf.variable_scope(
"weighted_sum_from_feature_columns",
features.values(),
partitioner=tf.min_max_variable_partitioner(
max_partitions=10, min_slice_size=((64 << 20) - 1))) as scope:
output, column_to_variable, _ = (
tf.contrib.layers.weighted_sum_from_feature_columns(
features, [country, language, country_language],
num_outputs=1,
scope=scope))
with self.test_session() as sess:
tf.initialize_all_variables().run()
tf.initialize_all_tables().run()
self.assertEqual(2, len(column_to_variable[country]))
self.assertEqual(3, len(column_to_variable[language]))
self.assertEqual(2, len(column_to_variable[country_language]))
weights = column_to_variable[country_language]
for partition_variable in weights:
sess.run(partition_variable.assign(partition_variable + 0.4))
# There are four crosses each with 0.4 weight.
# score = 0.4 + 0.4 + 0.4 + 0.4
self.assertAllClose(output.eval(), [[1.6]])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment