Is this task suitable for rslurm?

91 Views Asked by At

I have several experiments that I'd like to run one experiment on each node and each experiment is a sequence of executions with several cores. Right now my code looks like:

run_seeds <- c(1,2,3,4,5,6,7,8,9,10)

write_lines(paste("problem", "run_seed", "num_patients", "method", "name", "type", "error", sep="\t"), file=err_file_name)

# initialize loop
for (j in 1:length(run_seeds)) {
    ...
}

# start loop
for (i in range_pat) {

  print(paste("iteration",i))
  
  for (j in 1:length(run_seeds)) {

    run_seed<-run_seeds[[j]]
    set.seed(run_seed)
    ...
    write_lines(paste(problem, run_seed, i, "dst", ind_name, type, error, sep="\t"), file=err_file_name, append=TRUE)

  }
}

Is this task suitable for rslurm? If so, how can I change the code? By looking at the example given https://cran.r-project.org/web/packages/rslurm/vignettes/rslurm.html, I don't necessarily want to export and rds file or generate slurm script. I'd like to run it within one slurm script. Is it doable? Or do I need to change it to the format that's acceptable to rslurm? Also, there's a certain order of the result returned by the nodes. Is it still doable?

If not, what package would you recommend me using?

2

There are 2 best solutions below

0
George Ostrouchov On

Slurm is still used with different defaults and has different best practices for R at various HPC centers, so it is worthwhile to learn some basics of using Slurm directly before attempting to manage it via an R script, like rslurm.

For your multinode experiments, you should write two scripts: a Slurm submission script (say script.sh), and an MPI-enabled R script (say script.R).

The submission script requests the nodes and sets your software environment (R version, BLAS libraries, MPI version, etc.), which will differ across HPC centers and you will need to consult your documentation on what is available. module avail on a login node will tell you what software environments are available.

For example, script.sh could look like this:

#!/bin/bash
#SBATCH -A <your-account>
#SBATCH -p <submission-queue-name>
#SBATCH --nodes=4
#SBATCH --exclusive
#SBATCH -t 00:02:00

cd <directory-of-your-rscript>

module load r
module load openmpi

time mpirun --map-by ppr:1:node Rscript  script.R

Submit the script with sbatch script.sh. This requests exclusive access to all cores on 4 nodes with a 2 minute job limit. Items in <...> should be self explanatory. The modules loaded may differ on your cluster and there may be other modules necessary, depending on how R is deployed. The last line runs one instance of your script.R per node, using OpenMPI.

script.R would use the package pbdMPI, which you can install from CRAN in an interactive R session on a login node (again, after appropriate module load r and module load openmpi specific to your cluster).

pbdMPI provides RNG reproducibility that is independent of the number of nodes and cores used. Under the covers, it provides functions to manage independent streams from the parallel package that align with your application rather than resources. Your script.R would look like this:

library(pbdMPI)
num_streams <- 10

my_streams <- comm.chunk(num_streams, form = "vector", rng = TRUE, seed = 12345)

# initialize loop
for (j in my_streams) {
    err_file_name <- paste0("err_file_", j, ".txt")
    write_lines(paste("problem", "run_seed", "num_patients", "method", "name", "type", "error", sep="\t"), file=err_file_name)
    ...
}


# start loop
for (i in range_pat) {

  comm.print(paste("iteration",i))
  
  for (j in my_streams) {
    comm.set.stream(j)
    err_file_name <- paste0("err_file_", j, ".txt")
    ...
    write_lines(paste(problem, j, i, "dst", ind_name, type, error, sep="\t"), file=err_file_name, append=TRUE)
  }
}

finalize()

Several things to note here:

  • To avoid parallel instances of this code writing on top of each other, each seed writes to a different file. They all get the title line.
  • Only one instance (rank 0) writes the iteration number to standard output.
  • Since one instance runs on each node, all the cores on the node are available for parallel work. If you manage this yourself, I recommend mclapply(). If you don't need all the cores on a node, you can run more than one instance of this code per node by specifying it in the --map-by OpenMPI parameter.
  • Each instance of the code gets its own set of my_streams from comm.chunk(). my_streams is just a set of index values, which are used to set an independent stream of random numbers for each value. The streams continue where they left off with the next i value. It is also possible to reset them back to their start with each i by parameter reset = TRUE.
  • Ultimately this scales to any number of nodes if you have a large number of seeds. It is also reproducible with a single seed, independent of how many nodes or instances per node you use.
  • finalize() provides a graceful exit from MPI.
0
A.F.R.S2022 On

I am trying to reproduce a paper code that uses snowfall package , the paper used socket cluster while my institution does MPI, after reading snowfall documentation they provide MPI cluster by changing the type to MPI and this is what I have done , then I write SLURM script to submit the job: SLURM.sh , the number of cpus available per node is 40 , so I request 2 nodes


#!/bin/bash
#SBATCH --output=output.job9.log
#SBATCH --error=error.job9.log
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=40
#SBATCH --time=22:30:00

module load openmpi/4.1.4/intel
module load gsl/2.7
module load R/4.2.1

mpirun -n 80 Rscript code.R

code.R

library(xts)
library(rugarch)
library(mvtnorm)
library(copula)
library(Rmpi)


here reading the data  

library(snowfall)     # Parallel computing
library(rlecuyer)     # RNG

sfSetMaxCPUs( number = 80 )
sfInit(parallel = TRUE, cpus = 80, type = "MPI")

sfSource("CopulaFunctions.R")
sfSource("MLEstimation.R")
sfSource("RMApplication.R")

sfExport("V.ll")

sfLibrary(rugarch)
sfLibrary(evd)
sfLibrary(zoo)
sfLibrary(copula)
sfLibrary(mvtnorm)

wrapper <- function(i, n, lag, p, q, spec, V.ll){
  print("i = ")
  print(i)
  
  # 1. Estimate rolling-window parameters
  R   <- t(coredata(V.ll[(i-n+1) : (i+lag), ]))
  Est <- GARCH.est(R, spec, lag, v=10)
  if(is.na(Est)){return(NA)}                    # if estimators did not converge
  print("Est$GAS.Gauss = ")
  print(Est$GAS.Gauss)
  print("Est$GAS.t = ")
  print(Est$GAS.t)
  
  
  # 2. Produce rolling window CoVaR forecasts
  R                      <- t(coredata( V.ll[(i+1) : (i+lag), ] ))
  print("t Distr:")
  CoVaR.forecasts.t      <- CoVaR.t.forecast(     R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.t, lag, p, q)
  print("Gauss distr:")
  CoVaR.forecasts.Gauss  <- CoVaR.Gauss.forecast( R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.Gauss, lag, p, q)
  
  return( list(CoVaR.forecasts.t     = CoVaR.forecasts.t,
               CoVaR.forecasts.Gauss = CoVaR.forecasts.Gauss) )
}

sfSource("wrapper.R")

start   <- Sys.time()   
loop    <- seq.int(from = n, to = N-lag, by = lag)    # loop over these indices

# GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "fGARCH", garchOrder = c(1, 1), submodel = "GARCH"),
                  mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # simple GARCH(1,1) specification
Forecasts.GARCH <- sfLapply( loop, wrapper, n, lag=lag, p=p, q=q, spec, V.ll)

# GJR-GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "gjrGARCH", garchOrder = c(1, 1)),
                  mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # GJR-GARCH(1,1) specification
Forecasts.GJR  <- sfLapply( loop, wrapper, n, lag=lag, p=p, q=q, spec, V.ll)
print(Sys.time() - start)   # takes roughly 20h

sfStop()

The code shows an error no matter I try . What is wrong with the code ? based on the snowfall package this should work, Any advise is highly appreciated