With Horovod, you basically run N independent instances (so it is a form of between-graph replication), and they communicate via special Horovod ops (basically broadcast + reduce).
Now let's say either instance 0, or some other external instance loads your data (via tf.data.Dataset
). How would you distribute the iterator.get_next()
to each instance? Using Horovod broadcast would be inefficient, as you would copy all the data to all instances.
Having the dataset in every instance, and doing all the loading in there, and then using shard
on the dataset would also be inefficient, as you would load the data everywhere, and then throw away (N-1)/N of it.
So that's why you would also not want sharding, and instead have the dataset loading only in a single (producer/dataset worker) instance, which then distributes the batches on all the train workers.
I guess the TF MultiDeviceIterator
provides some similar functionality (or basically exactly that) but I'm not sure whether that works together with Horovod, and how you would set it up?
Or maybe you can make the distribution somehow via TF workers (guide)? (Maybe that is how you would configure MultiDeviceIterator
as well?)
If possible, this should be via TensorFlow operations / functions (there are many related functions which might already give me this, but I might not know about them, or have misunderstood how it works). Or maybe the answer is that TensorFlow does not provide any such functionality yet? (This would still be useful to know. Then I would implement my own solution in C++, wrapped as a TensorFlow op. But before doing so, it would be good to know whether this is really necessary.)
(Related is also this Horovod issue.)
(This question is actually a bit more generic than just Horovod, although Horovod might be a good example. You might have this problem always for distributed TensorFlow?)
(I collected an overview of all the distributed TensorFlow terminology and aspects here, mostly for clarification.)
(Related are (maybe?) also this, this, this, this or this questions.)
As you said, copying the data in each instance and sharding the data for each instance would be impractical.
One solution would then be to separate the data in a data process and have each instance pull data from the data process as shown in the figure below. For example, this communication can be established using a queue.
In such a system, the data process would load the dataset, preprocess it into batches and push the batches into a queue. Each training instance would then pull batches from this queue. For example, you could pass the queue as a generator into the dataset API (see tf.data.Dataset.from_generator). Also, if batches are not produced fast enough, it is possible to create more data processes to increase the batches throughput.
Depending on your use case, the implementation specifics will vary. For more information, you can look up Networking and Interprocess communication and Multiprocessing pipes and queues.
For a tensorflow implementation, you could use
tf.data.Dataset.shard
withtf.data.TFRecordDataset
.The documentation addresses your inefficiency concern using TFRecords: