How to train mnist data with tensorflow ParameterServerStrategy distributed training?

191 Views Asked by At

I'm trying to train the mnist dataset using the ParameterServerStrategy. As a beginner, I find the documentations to be confusing specially when it comes to the section "Clusters in the real world". This is the docs that I'm following:https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/distribute/parameter_server_training.ipynb#scrollTo=zyby6M2Jqg6J&uniqifier=1 So far I have this:


#this is mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_model():
  model = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(),
      metrics=['accuracy'])
  return model
#this is main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ['ip_of_deeplearning_VM:port'], #worker1
        "ps": ['ip_of_deeplearning_VM:port'], #worker2
        "chief": ['ip_of_deeplearning_VM:port'] #masterz
    },
    "task": {"type": "chief", "index": 0}
})

cluster_spec = tf.train.ClusterSpec({
    'ps':['ip_of_deeplearning_VM:port'], #worker2
    'worker': ['ip_of_deeplearning_VM:port'] #worker1
})

cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(cluster_spec, task_type="ps",task_id=0)

tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = mnist_setup.build_and_compile_model()
    
print("chief gets called!")
result = multi_worker_model.fit(multi_worker_dataset, epochs=3)

I copy these files to the worker and ps VM, change the index and run "main.py" on all of them at the same time. I get the message saying that the server was started at ip_address but that's about it. Would anyone please show me what I need to do in order to get this working?

1

There are 1 best solutions below

0
On

The distributed strategy has not been instantiated after the server has started. This code needs to be added after you configure your workers and PS ip addresses.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

There is also a mismatch in the guide you have attached and the code which is written.

model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

Once you instantiate a GCP VM, SSH into it , install the latest CUDA/CUDNN dependencies and copy the code which Im providing.

Please copy all the code in one document and use it all at once in the terminal.

I have written the correct code as given in the gist here. You can refer to that. Use Ubuntu OS as I have used the same.

The cluster orchestrator is expected to terminate PS processes when the job is done, along with handling failing workers by respawning processes. You will have to ps aux and then kill it manually.