Pytorch how to use num_worker>0 for Dataloader when using multiple gpus

41 Views Asked by At

I am using differential_evolution from scipy to do optimization. likehood is vectorized to calculate all the populations at once. I have 36 parameters and popsize=35. draws is also a large matrix (1000, 112, 7) for each population, and it is random draws from some distributions.

By following the pytorch tutorial I am now able to use multiple gpus. I want to use DataLoader to parallel taking draws. This works when I run it without using multiple gpu.

If I run it together like the following code, I get an error. I don't know how to understand this error, as this part of code is actually run on cpu not gpu? And I set num_workers=4 but it actually only has 2 processes, which I guess is expected because I set world_size=2, but how can use use for example, 2 gpu processes and at the same time each uses multiple cpus to do some parallel work?

class MapDataset(torch.utils.data.Dataset):
    def __init__(self, b, L):
        self.b = b
        self.L = L

    def __len__(self):
        return self.b.shape[1]

    def __getitem__(self, idx):
        b = self.b[:, idx]
        L = self.L[idx, :]
        obs = np.vstack((b, L))
        return obs
class Mymodel:

    def convert_draws():
        # using DataLoader to parallelly taking draws 
        return draws
    
    def li(draws, params):
        # using pytorch and gpu
        return li.cpu().numpy()
    
    def collate_fn(self,data):
        worker_info = torch.utils.data.get_worker_info()
        data = np.array(worker_info.dataset)
        worker_id = worker_info.id
        num_workers = worker_info.num_workers

        data_chunk = np.split(data, num_workers)[worker_id]
        b = data_chunk[:, 0, :]
        L = data[:, 1:, :]
        draw_list = []
        for i in range(b.shape[0]):
            draws = self.convert_draws(b[i, :], L[i, :])
            draw_list.append(draws)
        data = torch.tensor(np.array(draw_list))
        return data


    def likelihood(params):
        popinit_data = MapDataset(b, L)
        draws = DataLoader(popinit_data, batch_size=None, shuffle=False,sampler=None,batch_sampler=None, num_workers=4,collate_fn=self.collate_fn,worker_init_fn=None,prefetch_factor=1,generator=torch.Generator().manual_seed(1))
        draws = torch.vstack(list(draws))
        li = self.li(drass, params)
        ll = np.log(li).sum()
        return -ll

def min_de(rank,world_size):
    model=Mymodel()
    results = differential_evolution(model.likelihood)


import torch.multiprocessing as mp
if __name__ == '__main__':
    world_size = int(sys.argv[1])
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_processes,args=(rank,world_size,min_de,'ncll'))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/multigpu.py", line 446, in init_processes
    fn(rank, size)
  File "/multigpu.py", line 446, in init_processes
    fn(rank, size)
  File "/multigpu.py", line 473, in min_de
    model.test(x0=popinit.T)
  File "/multigpu.py", line 473, in min_de
    model.test(x0=popinit.T)
  File "/multigpu.py", line 389, in test
    print("test",self.likelihood(x0))
  File "/multigpu.py", line 389, in test
    print("test",self.likelihood(x0))
  File "/multigpu.py", line 344, in likelihood
    draws = torch.vstack(list(draws))
  File "/multigpu.py", line 344, in likelihood
    draws = torch.vstack(list(draws))
  File "/Library/Python/3.9/site-packages/torch/utils/data/dataloader.py", line 439, in __iter__
    return self._get_iterator()
  File "/Library/Python/3.9/site-packages/torch/utils/data/dataloader.py", line 439, in __iter__
    return self._get_iterator()
  File "/Library/Python/3.9/site-packages/torch/utils/  File "/Library/Python/3.9/site-packages/torch/utils/data/dataloader.py", line 387, in _get_iterator
    return _MultiProcessingDataLoaderIter(self)
data/dataloader.py", line 387, in _get_iterator
    return _MultiProcessingDataLoaderIter(self)
  File "/Library/Python/3.9/site-packages/torch/utils/data/dataloader.py", line 1040, in __init__
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/Library/Python/3.9/site-packages/torch/utils/data/dataloader.py", line 1040, in __init__
    w.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "/Library/Python/3.9/site-packages/torch/multiprocessing/reductions.py", line 557, in reduce_storage
    metadata = storage._share_filename_cpu_()
  File "/Library/Python/3.9/site-packages/torch/storage.py", line 294, in wrapper
    return fn(self, *args, **kwargs)
  File "/Library/Python/3.9/site-packages/torch/storage.py", line 368, in _share_filename_cpu_
    return super()._share_filename_cpu_(*args, **kwargs)
RuntimeError: _share_filename_: only available on CPU

UPDATE: I guess I should not use gloo when testing in in m1. It works in linux using nccl.

0

There are 0 best solutions below