multiprocessing a keras subclassing model

13 Views Asked by At

I was trying to implement cross-validation on a keras model using multiprocessing, so different cv-folds are calculated in parallel. The keras model was build using the subclassing API. I first tried to use multiprocess.Pool, passing to the same model different datasets using the starmap function, but it seems that models build by subclassing do not do well with pickle, and apparently Pool uses pickle to duplicate the function.

So I decided to build independent models for each cv-fold, feed them with independent datasets, and launch them as independent processes, using a queue to store all results (I could also write the results to a IO file shared with all processes, if "queue" is the problem). The code is something like:

def run_cv_fold(fold_number,model,train_loader,validation_loader,validation_loader2,test_loader,callbacks,queue):
    print ("starting fold: "+str(fold_number))
    #compile and train the model
    model.compile(optimizer=keras.optimizers.Adam(learning_rate),loss=noisy_rms,metrics="mae")
    history=model.fit(train_loader.load(),steps_per_epoch=train_loader.steps_per_epoch,
                      validation_data=validation_loader,validation_steps=validation_loader.steps_per_epoch,
                      callbacks=callbacks, verbose=0, epochs=epochs)
    #save the model
    model.save("./pkapredictor_cv"+str(fold_number)+".keras")
    #get the predictions of the trained model for validation and test set, storing also 
    for batch in validation_loader2: #for some reason, a different validation_loader is needed here
        inputs, target = batch
        real_valid=list(target)
        predicted=model.predict(inputs) 
        predicted_valid=list(np.ndarray.flatten(predicted))
    for batch in test_loader:
        inputs,target= batch
        real_test=list(target)
        predictions_test=model.predict(inputs) 
        predicted_test=list(np.ndarray.flatten(predictions_test))
    set the results to the queue
    queue.put(predicted_valid,real_valid,predicted_test, real_test,history

#create list of independent datasets, models, and even callbaks
train_loaders,validation_loaders,validation_loaders2,test_loaders,tensorboard_cbs=[],[],[],[],[] 
for train_indexes,validation_indexes in cv_splitter:
    fold_number+=1
    train_loaders.append(my_BatchLoader(train_set[train_indexes],batch_size=64))
    validation_loaders.append(my_BatchLoader(train_set[validation_indexes],batch_size=64,shuffle=False))
    validation_loaders2.append(my_BatchLoader(train_set[validation_indexes],batch_size=64,epochs=1,shuffle=False))
    test_loaders.append(my_BatchLoader(test_set,batch_size=64,epochs=1,shuffle=False))
    models.append(keras.models.clone_model(subclassing_model))
    tensorboard_cbs.append(tf.keras.callbacks.TensorBoard(get_run_logdir(cv=fold_number)))

#the multiprocessing suff
from multiprocessing import Queue
from multiprocessing import Process
queue=Queue()
procs=[]
for i in range(cv):
    procs.append( Process(target=run_cv_fold,
                          args= (i,models[i],train_loaders[i],
                            validation_loaders[i],validation_loaders2[i],test_loaders[i],
                            [tensorboard_cbs[i],early_stopping_cb,lr_scheduler_cb],queue,)      ))
print ("rock'n'roll")
for proc in procs: proc.start()
for proc in procs: proc.join()
 

When I run the jobs, "rock'n'roll" is printed (so it gets to this line of code), and I know that fit() is called for each model, since the build() methods of the models and print instructions inside it are executed, but the call() method is never executed. If instead of using the Process I simply call run_cv_fold function with the different models as arguments, the models are trained...

Any suggestion?

0

There are 0 best solutions below