How to run another Rscript after several R jobs running in parallel are done?

1.3k Views Asked by At

The arrangement on how I need to run my scripts is first to run the 4 R scripts in parallel using the rstudioapi::jobRunScript() function. Each of the scripts that is running in parallel does not import anything from any environment but instead exports the data frames created to the global environment. My 5th R script builds on the data frames created by the 4 R scripts that run in parallel and also this 5th script is running in the console. If there's a way to run the 5th script in the background rather than in the console after the first 4 R scripts are done running in parallel, that would be a lot better. I'm also trying to reduce the total running time of the whole process.

Although I was able to figure out how to run the first 4 R scripts in parallel, my task isn't completely done because I can't find a way on how to trigger the running of my 5th R script. Hope y'all can help me here

4

There are 4 best solutions below

5
On BEST ANSWER

This is a bit too open for my liking. While rstudioapi definitely can be used for running parallel tasks, it is not very versatile and does not give you very useful output. The parallel universe is well implemented in R with several packages that provide a much simpler and better interface for doing this. Here are 3 options, which also allow for the something to be 'output' from the different files.

package = parallel

With the parallel package we can achieve this very simply. Simply creating a vector of files to be sourced and executing source in each thread. The main process will lock while they are running, but if you have to wait for them to finish anyway, this doesn't really matter much.

library(parallel)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
# full path to file that should execute 
files <- c(...) 
# use an lapply in parallel.
result <- parLapply(cl, files, source)
# Remember to close the cluster
stopCluster(cl)
# If anything is returned this can now be used.

As a side note, several packages have a similar interface to the parallel package, which was build upon the snow package, so it is a good baseline to have knowledge of.

package = foreach

An alternative to the parallel package is the foreach package, which gives something similar to a for-loop interface, simplifying the interface while giving a more flexibility and automatically importing necessary libraries and variables (although it is safer to do this manually).
The foreach package does depend on the parallel and doParallel packages to set up a cluster however

library(parallel)
library(doParallel)
library(foreach)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
files <- c(...) 
registerDoParallel(cl)
# Run parallel using foreach
# remember %dopar% for parallel. %do% for sequential.
result <- foreach(file = files, .combine = list, .multicombine = TRUE) %dopar% { 
  source(file)
  # Add any code before or after source.
}
# Stop cluster
stopCluster(cl)
# Do more stuff. Result holds any result returned by foreach.

While it does add a few lines of code, the .combine, .packages and .export makes for a very simple interface to work with parallel computing in R.

package = future

Now this is one of the more rare packages to be used. future provides a parallel interface that is more flexible than both parallel and foreach allowing for asynchronous parallel programming. The implementation can however seem a bit more daunting, while the example I provide below is only scratching the surface of what is possible.
Also worth mentioning is that while the future package does provide automatic import of functions and packages necessary to run code, experience has made me aware that this is limited only to the first level of depth in any call (sometimes less), as such exporting is still necessary.
While foreach depends on parallel (or similar) to start a cluster, foreach will start one itself using all the available cores. A simple call to plan(multiprocess) will start a multi core session.

library(future)
files <- c(...) 
# Start multiprocess session
plan(multiprocess)
# Simple wrapper function, so we can iterate over the files variable easier
source_future <- function(file)
  future(file)
results <- lapply(files, source_future)
# Do some calculations in the meantime
print('hello world, I am running while waiting for the futures to finish')
# Force waiting for the futures to finish
resolve(results)
# Extract any result from the futures
results <- values(results)
# Clean up the process (close down clusters)
plan(sequential)
# Run some more code.

Now this might seem quite heavy at firsts, but the general mechanism is:

  1. Call plan(multiprocess)
  2. Execute some function(s) using future (or %<-%, which I wont go into)
  3. Do something else if you have more code to run, that does not depend on the processes
  4. Wait for the results using resolve, which works on a single future or multiple futures in a list (or environment)
  5. Collect the result using value for single futures or values for multiple futures in a list (or environment)
  6. Clear up any cluster running in the future environment by using plan(sequential)
  7. Continue with code that depended on the result of your futures.

I believe these 3 packages provide interfaces to every necessary element of multiprocessing (at least on CPU) that any user needs to interface with. Other packages provide alternative interfaces while for asynchronous I am only aware of future and promises. In general I'd advice most users to be very careful when moving into asynchronous programming, as this can cause a whole suite of problems that are less frequent compares to synchronous parallel programming.

I hope this may help provide an alternative to the (very limiting) rstudioapi interface, which I am fairly certain was never meant to be used for parallel programming by the users themselves, but more likely intended to perform tasks such as building a package in parallel by the interface itself.

0
On

You could use promises in combination with future : promises::promise_all followed by promises::then allows to wait for completion of previous futures before launching the last one as a background process.
You keep control of the console while jobs are running in background.

library(future)
library(promises)

plan(multisession)
# Job1
fJob1<- future({
  # Simulate a short duration job
  Sys.sleep(3)
  cat('Job1 done \n')
})

# Job2
fJob2<- future({
  # Simulate a medium duration job
  Sys.sleep(6)
  cat('Job2 done \n')
})


# Job3
fJob3<- future({
  # Simulate a long duration job
  Sys.sleep(10)
  cat('Job3 done \n')
})

# last Job
runLastJob <- function(res) {
  cat('Last job launched \n')
  # launch here script for last job
}

# Cancel last Job
cancelLastJob <- function(res) {
  cat('Last job not launched \n')
}

#  Wait for all jobs to be completed and launch last job
p_wait_all <- promises::promise_all(fJob1, fJob2, fJob3 )
promises::then(p_wait_all,  onFulfilled = runLastJob, onRejected = cancelLastJob)

Job1 done 
Job2 done 
Job3 done 
Last job launched 
0
On

There is a little known way to do this: rstudioapi doesn't allow you to check on job status, but .rs.invokeRpc("get_jobs") (have a look at this) does. Use that in combination with while(), to keep checking if the 4 jobs finished before executing the final 5th script.

a function that i often reuse:

n_jobs_running <- function() {
   current_jobs <- .rs.invokeRpc("get_jobs")
   jobs_running <- c()
   for(id in names(current_jobs)) {
      job_running <- ifelse(current_jobs[[id]][["state_description"]] == "running", T, F)
      jobs_running <- append(jobs_running, job_running)
   }
return(sum(jobs_running))
}
0
On

I don't know how adaptable this is to your current scenario, but here's a way to have four things running in parallel, get their return value, and then trigger a fifth expression/function.

The premise is using callr::r_bg for running the individual files. This actually runs a function, not a file, so I'll modify the expectation of what these files look like a little.

I'll write an ancillary script that is intended to mimic one of your four scripts. I'll guess that you also want to be able to source this normally (run it directly instead of as a function), so I'll generate the script file so that it "knows" if it's being sourced or run directly (based on Rscript detect if R script is being called/sourced from another script). (If you know python, this is analogous to python's if __name__ == "__main__" trick.)

The ancillary script named somescript.R.

somefunc <- function(seconds) {
  # put the contents of a script file in this function, and have
  # it return() the data you need back in the calling environment
  Sys.sleep(seconds)
  return(mtcars[sample(nrow(mtcars),2),1:3])
}

if (sys.nframe() == 0L) {
  # if we're here, the script is being Rscript'd, not source'd
  somefunc(3)
}

As a demo, if source'd on the console, this just defines the function (or multiple, if you desire), it does not execute the code within the last if block:

system.time(source("~/StackOverflow/14182669/somescript.R"))
#                                   # <--- no output, it did not return a sample from mtcars
#    user  system elapsed 
#       0       0       0           # <--- no time passed

but if I run this with Rscript in a terminal,

$ time /c/R/R-4.0.2/bin/x64/Rscript somescript.R
               mpg cyl  disp
Merc 280C     17.8   6 167.6
Mazda RX4 Wag 21.0   6 160.0

real    0m3.394s                    # <--- 3 second sleep
user    0m0.000s
sys     0m0.015s

Back to the premise. Instead of four "scripts", rewrite your script files like my somescript.R above. If done correctly, they can be Rscripted as well as sourced with different intentions.

I'm going to use this one script four times instead of four scripts. Here's a manual run-through of what we want to automate:

# library(callr)
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
sapply(tasks, function(tk) tk$is_alive())
# [1]  TRUE FALSE  TRUE FALSE
### time passes
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE  TRUE FALSE
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE FALSE FALSE

tasks[[1]]$get_result()
#                    mpg cyl  disp  hp drat    wt  qsec vs am gear carb
# Merc 280          19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
# Chrysler Imperial 14.7   8 440.0 230 3.23 5.345 17.42  0  0    3    4

We can automate that with

source("somescript.R")
message(Sys.time(), " starting")
# 2020-08-28 07:45:31 starting
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
# some reasonable time-between-checks
while (any(sapply(tasks, function(tk) tk$is_alive()))) {
  message(Sys.time(), " still waiting")
  Sys.sleep(1)                      # <-- over to you for a reasonable poll interval
}
# 2020-08-28 07:45:32 still waiting
# 2020-08-28 07:45:33 still waiting
# 2020-08-28 07:45:34 still waiting
# 2020-08-28 07:45:35 still waiting
# 2020-08-28 07:45:36 still waiting
# 2020-08-28 07:45:37 still waiting
# 2020-08-28 07:45:38 still waiting
# 2020-08-28 07:45:39 still waiting
# 2020-08-28 07:45:40 still waiting
# 2020-08-28 07:45:41 still waiting
message(Sys.time(), " done!")
# 2020-08-28 07:45:43 done!
results <- lapply(tasks, function(tk) tk$get_result())
str(results)
# List of 4
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 24.4 32.4
#   ..$ cyl : num [1:2] 4 4
#   ..$ disp: num [1:2] 146.7 78.7
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 30.4 14.3
#   ..$ cyl : num [1:2] 4 8
#   ..$ disp: num [1:2] 95.1 360
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 15.2 15.8
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 276 351
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 14.3 15.2
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 360 304

And now run your fifth function/script.