I would like to parallelize a for loop using parallelReduce
of RcppParallel
but struggle with join
ing the results because the dimension of the parallel output for each iteration is not known a priori. With the foreach
package in R, this can be done with foreach
using .combine=rbind
:
library(foreach)
library(doParallel)
# some dummy function which, as a function of the iteration index, returns a Boolean
lengthy_test <- function(row_of_data){
if(runif(1)<0.5){
return(TRUE)
}else{
return(FALSE)
}
}
input <- matrix(NA,1000,1)
output = foreach(i=1:1000,.combine=rbind)%dopar%{
if(lengthy_test(input[i,])){
n_rows_i = 1 # n_rows_i is the number of rows that we would like to add
}else{
n_rows_i = 2
}
matrix(rnorm(n_rows_i*3),nrow =n_rows_i, ncol = 3)
}
How can one achieve the same with parallelReduce
? The key problem is to write the Split
constructor and the join
operator since a priori it is unknown which row of the input
matrix is associated with which row(s) of the output
matrix.
In my use case, the dimensions of output
is known a priori so that the preallocation of an output matrix (or construction of a Worker
without Split
) is no problem.
A serial implementation in C++ is not a problem since one can simply carry around a counter of how many rows of the output
matrix one has filled already. How this can be done in parallel is not obvious to me.
Edit
Since F. Privé asked: Here is how I'd try to adapt the code from the RcppParallel Documentation
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;
struct Test : public Worker
{
// source matrix
const RMatrix<double> input;
// number of rows of output matrix
int nrows;
// output matrix
RMatrix<double> output;
// constructors
Test(const NumericMatrix input) : input(input), nrows(input.nrow()*2) output(nrows, 3) {}
Test(const Test& test, Split) : input(test.input) {
nrows = 0;
for (int i = 0; i < input.nrow(); i++) {
if(Rcpp::runif(1)<0.5){
nrows = nrows + 1;
}else{
nrows = nrows + 2;
}
// now I know the dimension I have to assign to output
// i.e. nrows rows and 3 columns
// but I don't know _which_ rows I have to assign
// this is why I thought that parallelReduce might be better than parallelFor
// since I think I can just do this and worry about the joining later
output = Rmatrix(Rcpp::rnorm(nrows*3),nrows,3);
}
// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
// perhaps I can leave this empty
// since all the work is done in the split constructor
// but I'm not sure that the split constructor is always called
}
// join my output with that of another Test
void join(const Test& rhs) {
// here the rbind equivalent is needed
// the order of binding is not imporant for me (maybe for others, it is)
output = rbind(output,rhs.output);
}
};
And then, one could call it like this
// [[Rcpp::export]]
double Testcombine(NumericMatrix x){
// declare the Test instance
Test test(x);
// call parallel_reduce to start the work
parallelReduce(0, x.nrow(), test);
// return the computed test
return test.output;
}