RcppParallel equivalent of .combine=rbind

172 Views Asked by At

I would like to parallelize a for loop using parallelReduce of RcppParallel but struggle with joining the results because the dimension of the parallel output for each iteration is not known a priori. With the foreach package in R, this can be done with foreach using .combine=rbind:

library(foreach)
library(doParallel)

# some dummy function which, as a function of the iteration index, returns a Boolean
lengthy_test <- function(row_of_data){
  if(runif(1)<0.5){
    return(TRUE)
  }else{
    return(FALSE)
  }
}

input <- matrix(NA,1000,1)

output = foreach(i=1:1000,.combine=rbind)%dopar%{
  if(lengthy_test(input[i,])){
    n_rows_i = 1            # n_rows_i is the number of rows that we would like to add
  }else{
    n_rows_i = 2
  }
  matrix(rnorm(n_rows_i*3),nrow =n_rows_i, ncol = 3)
}

How can one achieve the same with parallelReduce? The key problem is to write the Split constructor and the join operator since a priori it is unknown which row of the input matrix is associated with which row(s) of the output matrix.

In my use case, the dimensions of output is known a priori so that the preallocation of an output matrix (or construction of a Worker without Split) is no problem.

A serial implementation in C++ is not a problem since one can simply carry around a counter of how many rows of the output matrix one has filled already. How this can be done in parallel is not obvious to me.

Edit

Since F. Privé asked: Here is how I'd try to adapt the code from the RcppParallel Documentation

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct Test : public Worker
{   
   // source matrix
   const RMatrix<double> input;

   // number of rows of output matrix
   int nrows;   

   // output matrix

   RMatrix<double> output;
   
   // constructors
   Test(const NumericMatrix input) : input(input), nrows(input.nrow()*2) output(nrows, 3) {}
   Test(const Test& test, Split) : input(test.input) {
     nrows = 0;
     for (int i = 0; i < input.nrow(); i++) {
       if(Rcpp::runif(1)<0.5){
         nrows = nrows + 1;
       }else{
         nrows = nrows + 2;
       }
       // now I know the dimension I have to assign to output
       // i.e. nrows rows and 3 columns 
       // but I don't know _which_ rows I have to assign
       // this is why I thought that parallelReduce might be better than parallelFor 
       // since I think I can just do this and worry about the joining later    

       output = Rmatrix(Rcpp::rnorm(nrows*3),nrows,3);
   }
   
   // accumulate just the element of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      // perhaps I can leave this empty 
      // since all the work is done in the split constructor
      // but I'm not sure that the split constructor is always called
   }
     
   // join my output with that of another Test
   void join(const Test& rhs) { 
      //  here the rbind equivalent is needed
      // the order of binding is not imporant for me (maybe for others, it is)

      output = rbind(output,rhs.output); 
   }
};

And then, one could call it like this

// [[Rcpp::export]]
double Testcombine(NumericMatrix x){
   
   // declare the Test instance 
   Test test(x);
   
   // call parallel_reduce to start the work
   parallelReduce(0, x.nrow(), test);
   
   // return the computed test
   return test.output;
}
0

There are 0 best solutions below