I have a dataset of 2.7 million samples that I need to test my ML model on. I have 8 cores on my laptop and want to try parallelizing my testing code to save time. This is the test function :
def testMTGP(x_sample, y_sample, ind, model, likelihood):
x_sample = x_sample.view(1, -1)
y_sample = y_sample.view(1, -1)
model.eval()
likelihood.eval()
with torch.no_grad():
prediction = likelihood(model(x_sample))
mean = (prediction.mean).detach().numpy()
prewhiten_error = (y_sample.detach().numpy()) - mean
cov_matrix = (prediction.covariance_matrix).detach().numpy()
white_error, matcheck = Whiten(prewhiten_error, cov_matrix)
return (
ind,
{
"prediction": mean,
"prewhiten_error": prewhiten_error,
"white_error": white_error,
"cov_matrix": cov_matrix,
"matcheck": matcheck,
},
)
I return the index corresponding to the sample I tested and a dictionary of data related to the computations the model does for testing. The function Whiten(prewhiten_error, cov_matrix) is also defined by me and was imported at the beginning of the code file, so it is available globally. It simply takes the inputs, transforms cov_matrix and multiplies it with prewhiten_error and returns the answer, along with a variable that indicates some state information about the cov_matrix.
For the multiprocessing, the idea is to first divide the entire dataset into roughly equal sizes chunks; pick each chunk and send one sample to every core for processing. I am using pool.apply_async. This is the code:
test_X = torch.load(test_X_filename) #torch tensor of shape 2.7M x 3
test_Y = torch.load(test_Y_filename) #torch tensor of shape 2.7M x 3
cores = mp.cpu_count()
chunk_size = int(test_X.shape[0] / cores)
start_time = time.time()
parent_list = []
for start_ind in range(0, test_X.shape[0], chunk_size):
pool = mp.Pool(processes=cores)
proc_data_size = int(chunk_size / cores)
stop_ind = min(test_X.shape[0], start_ind + chunk_size)
results = [
pool.apply_async(
testMTGP, (test_X[i].detach(), test_Y[i].detach(), i, model, likelihood,)
)
for i in range(start_ind, stop_ind)
]
for res in results:
print("Length of results list= ", len(results))
print("Data type of res is: ", type(res))
res_dict = res.get()
parent_list.append(res_dict)
pool.close()
test_X[i] and test_Y[i] are both tensors with shape (3,). On executing the code I get:
Traceback (most recent call last):
File "multiproc_async.py", line 288, in
res_dict = res.get() # [1]
File "/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object MultitaskGaussianLikelihood.__init__.<locals>.<lambda>
I am new to multiprocessing and googling for this error did not really help (some of it was not relevant and some was beyond my understanding). Can someone please help me understand what mistake I am making?
Let's simplify your problem to the root cause of it. We need a working example, for the multiprocessing part, otherwise we don't have a reproducible example to help you. Then you can patch in the actual training the model.
Let's use this dummy function:
Then a working and clean example is:
Try with this and, little by little, bring in the actual logic you need for training the model. I suggest you start by passing the actual arguments you want each time, and at the end of it updating the testMTGP function (replacing the dummy one).
When you isolate what makes the code crash, and/or post the stack trace, I can help more.