How to use sfInit and makeCluster type "MPI" / message passing in R / parallelization on cluster

429 Views Asked by At

I'm trying to adapt this R script for a speed test to work on a cluster.

When using the sfInit and makecluster functions with the type "SOCK", then the script runs on the cluster successfully, but without any speed improvement - unlike on my computer: when I change detectcores() to 1, the scripts runs substantially slower than with 4 cores.

I'm pretty sure I need to change the type to "MPI", though, in order to make the nodes communicate memory-wise with each other.

BUT: if I do so, the script then stops with following error code:

Loading required package: Rmpi
Error: package or namespace load failed for ‘Rmpi’:
 .onLoad failed in loadNamespace() for 'Rmpi', details:
  call: dyn.load(file, DLLpath = DLLpath, ...)
  error: unable to load shared object '/cluster/sfw/R/3.5.1-gcc73-base/lib64/R/library/Rmpi/libs/Rmpi.so':
  libmpi.so.20: cannot open shared object file: No such file or directory
Failed to load required library: Rmpi for parallel mode MPI
Fallback to sequential execution
snowfall 1.84-6.1 initialized: sequential execution, one CPU.

I thought "piece of cake, easy" and added the following lines:

install.packages('Rmpi', repos = "http://cran.us.r-project.org",
dependencies = TRUE, lib = '/personalpath') install.packages('doMPI',
repos = "http://cran.us.r-project.org", dependencies = TRUE, lib = '/personalpath') library(topicmodels, lib.loc = '/personalpath')
library(Rmpi, lib.loc = '/personalpath')

Which results in a successful installation but:

Error in library(Rmpi, lib.loc = "/personalpath") :
there is no package called ‘Rmpi’

1. How do I install these packages?

2. Do I really need to install them or is this a completely wrong approach?

Any help is highly appreciated! I know there are a couple of questions around here (see this, this, and this). But I'm not familiar with the calls in Linux and more importantly I do not have any rights on that cluster. So I need to come up with a solution in R...

So.. this is my code:

sfInit(parallel=TRUE, cpus=detectCores(), type="MPI")

cl <- makeCluster(detectCores(), type = "MPI")
registerDoSNOW(cl) 

sfExport('dtm_stripped', 'control_LDA_Gibbs')
sfLibrary(topicmodels)

clusterEvalQ(cl, library(topicmodels))
clusterExport(cl, c("dtm_stripped", "control_LDA_Gibbs"))

BASE <- system.time(best.model.BASE <<- lapply(seq, function(d){LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)}))
PLYR_S <- system.time(best.model.PLYR_S <<- llply(seq, function(d){LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)}, .progress = "text"))

wrapper <- function (d) topicmodels:::LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)
PARLAP <- system.time(best.model.PARLAP <<- parLapply(cl, seq, wrapper))
DOPAR <- system.time(best.model.DOPAR <<- foreach(i = seq, .export = c("dtm_stripped", "control_LDA_Gibbs"), .packages = "topicmodels", .verbose = TRUE) %dopar% (LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', k=i)))
SFLAPP <- system.time(best.model.SFLAPP <<- sfLapply(seq, function(d){topicmodels:::LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)})) 
SFCLU <- system.time(best.model.SFCLU <<- sfClusterApplyLB(seq, function(d){topicmodels:::LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)})) 
PLYRP <- system.time(best.model.PLYRP <<- llply(seq, function(d){topicmodels:::LDA(dtm_stripped, control = control_LDA_Gibbs, method ='Gibbs', d)}, .parallel = TRUE))

results_speedtest <- rbind(BASE, PLYR_S, PARLAP, DOPAR, SFLAPP, SFCLU, PLYRP)
print(results_speedtest)
1

There are 1 best solutions below

0
Hope On

There are other ways to parallelize in R. Maybe this link will help, as the second page explains, what these cluster types such as socket, mpi and fork do: https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf

Otherwise I can also recomment looking into the package foreach, as syntax is a lot more like a regular for-loop. Note that some parallelizing packages not available for all operating systems.