Distributed Tensorflow: good example for synchronous training on CPUs

4.3k Views Asked by At

I am new to distributed tensorflow and am looking for a good example to perform synchronous training on CPUs.

I have already tried the Distributed Tensorflow Example and it can perform the asynchronous training successfully over 1 parameter server (1 machine with 1 CPU) and 3 workers (each worker = 1 machine with 1 CPU). However, when it comes to the synchronous training, I am not able to run it correctly, although I have followed the tutorial of SyncReplicasOptimizer(V1.0 and V2.0).

I have inserted the official SyncReplicasOptimizer code into the working asynchronous training example but the training process is still asynchronous. My detailed code is as follows. Any code relates to synchronous training is within the block of ******.

import tensorflow as tf
import sys
import time

# cluster specification ----------------------------------------------------------------------
parameter_servers = ["xx1.edu:2222"]
workers = ["xx2.edu:2222", "xx3.edu:2222", "xx4.edu:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})

# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

# start a server for a specific task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)

# Parameters  ----------------------------------------------------------------------
N = 3 # number of replicas
learning_rate = 0.001
training_epochs = int(21/N)
batch_size = 100

# Network Parameters
n_input = 784 # MNIST data input (img shape: 28*28)
n_hidden_1 = 256 # 1st layer number of features
n_hidden_2 = 256 # 2nd layer number of features
n_classes = 10 # MNIST total classes (0-9 digits)

if FLAGS.job_name == "ps":
    server.join()
    print("--- Parameter Server Ready ---")
elif FLAGS.job_name == "worker":
    # Import MNIST data
    from tensorflow.examples.tutorials.mnist import input_data
    mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
    # Between-graph replication
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        # count the number of updates
        global_step = tf.get_variable('global_step', [], 
                                      initializer = tf.constant_initializer(0), 
                                      trainable = False,
                                      dtype = tf.int32)
        # tf Graph input
        x = tf.placeholder("float", [None, n_input])
        y = tf.placeholder("float", [None, n_classes])

        # Create model
        def multilayer_perceptron(x, weights, biases):
            # Hidden layer with RELU activation
            layer_1 = tf.add(tf.matmul(x, weights['h1']), biases['b1'])
            layer_1 = tf.nn.relu(layer_1)
            # Hidden layer with RELU activation
            layer_2 = tf.add(tf.matmul(layer_1, weights['h2']), biases['b2'])
            layer_2 = tf.nn.relu(layer_2)
            # Output layer with linear activation
            out_layer = tf.matmul(layer_2, weights['out']) + biases['out']
            return out_layer

        # Store layers weight & bias
        weights = {
            'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
            'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
            'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes]))
        }
        biases = {
            'b1': tf.Variable(tf.random_normal([n_hidden_1])),
            'b2': tf.Variable(tf.random_normal([n_hidden_2])),
            'out': tf.Variable(tf.random_normal([n_classes]))
        }

        # Construct model
        pred = multilayer_perceptron(x, weights, biases)

        # Define loss and optimizer
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))

        # ************************* SyncReplicasOpt Version 1.0 *****************************************************
        ''' This optimizer collects gradients from all replicas, "summing" them, 
        then applying them to the variables in one shot, after which replicas can fetch the new variables and continue. '''
        # Create any optimizer to update the variables, say a simple SGD
        opt = tf.train.AdamOptimizer(learning_rate=learning_rate)

        # Wrap the optimizer with sync_replicas_optimizer with N replicas: at each step the optimizer collects N gradients before applying to variables.
        opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=N,
                                        replica_id=FLAGS.task_index, total_num_replicas=N)

        # Now you can call `minimize()` or `compute_gradients()` and `apply_gradients()` normally
        train = opt.minimize(cost, global_step=global_step)

        # You can now call get_init_tokens_op() and get_chief_queue_runner().
        # Note that get_init_tokens_op() must be called before creating session
        # because it modifies the graph.
        init_token_op = opt.get_init_tokens_op()
        chief_queue_runner = opt.get_chief_queue_runner()
        # **************************************************************************************

        # Test model
        correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, "float"))

        # Initializing the variables
        init_op = tf.initialize_all_variables()
        print("---Variables initialized---")

    # **************************************************************************************
    is_chief = (FLAGS.task_index == 0)
    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=is_chief,
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             global_step=global_step,
                             save_model_secs=600)
    # **************************************************************************************

    with sv.prepare_or_wait_for_session(server.target) as sess:
        # **************************************************************************************        
        # After the session is created by the Supervisor and before the main while loop:
        if is_chief:
            sv.start_queue_runners(sess, [chief_queue_runner])
            # Insert initial tokens to the queue.
            sess.run(init_token_op)
        # **************************************************************************************
        # Statistics
        net_train_t = 0
        # Training
        for epoch in range(training_epochs):
            total_batch = int(mnist.train.num_examples/batch_size)
            # Loop over all batches
            for i in range(total_batch):
                batch_x, batch_y = mnist.train.next_batch(batch_size)
                # ======== net training time ========
                begin_t = time.time()
                sess.run(train, feed_dict={x: batch_x, y: batch_y})
                end_t = time.time()
                net_train_t += (end_t - begin_t)
                # ===================================
            # Calculate training accuracy
            # acc = sess.run(accuracy, feed_dict={x: mnist.train.images, y: mnist.train.labels})
            # print("Epoch:", '%04d' % (epoch+1), " Train Accuracy =", acc)
            print("Epoch:", '%04d' % (epoch+1))
        print("Training Finished!")
        print("Net Training Time: ", net_train_t, "second")
        # Testing
        print("Testing Accuracy = ", accuracy.eval({x: mnist.test.images, y: mnist.test.labels}))

    sv.stop()
    print("done")

Anything wrong with my code? Or can I have a good example to follow?

3

There are 3 best solutions below

0
On

I am not sure if you would be interested in user-transparent distributed tensorflow which uses MPI in the backend. We have recently developed one such version with MaTEx: https://github.com/matex-org/matex.

Hence, for distributed TensorFlow, you would not need to write a SyncReplicaOptimizer code, since all the changes are abstracted from the user.

Hope this helps.

0
On

I think your question can be answered as the comments in the issue #9596 of the tensorflow. This problem is caused by the bugs of the new version of tf.train.SyncReplicasOptimizer(). You can use old version of this API to avoid this problem.

Another solution is from the Tensorflow Distributed Benchmarks. Take a look at the source code, and you can find that they synchronize workers manually through the queue in the tensorflow. Through experiments, this benchmark runs exactly as what you expect.

Hope these comments and resources can help you solve your problem. Thanks!

0
On

One issue is that you need to specify an aggregation_method in the minimize method for it to run synchronously,

train = opt.minimize(cost, global_step=global_step, aggregation_method=tf.AggregationMethod.ADD_N)