How to iterate a List in parallel with RcppParallel?

104 Views Asked by At

One of my inputs is a List of NumericVector. If I declare this as I would with regular (non paralllel) Rcpp code, I get a correct result at the end, but with the stack imbalance warnings.

Warning: stack imbalance in '.Call', 18 then 35
Warning: stack imbalance in 'invisible', 17 then 34
Warning: stack imbalance in '=', 12 then 29
Warning: stack imbalance in 'withVisible', 6 then 23

I understand this is because I cannot use the regular NumericVector in parallel code, so I changed it to RVector<double> but now I get a couple of errors in RVector.h like

request for member 'begin' in '(SEXPREC*&)(& source)', which is of pointer type 'SEXPREC*' (maybe you meant to use '->' ?)

Here is a demo of the structure of my code, hopefully it is enough to see what I am doing

// [[Rcpp::plugins("cpp11")]]
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
#include <Rcpp.h>
using namespace Rcpp;
using namespace RcppParallel;

struct MyWorker :  public Worker 
{
  const List knownPrices;
  RMatrix<double> estReturns;
  const int maxRows;
  
  MyWorker(const List knownPrices, NumericMatrix estReturns, int maxRows)
    : knownPrices(knownPrices), estReturns(estReturns), maxRows(maxRows) {}
  
  void operator()(std::size_t begin, std::size_t end){
    for (auto iCol = begin; iCol < end; iCol++){
      NumericVector priceColumn = knownPrices[iCol];
      int nPrices = priceColumn.size();
      
      if (nPrices != maxRows + 1){
        stop("knownPrices length does match size of estReturns");
      }
      
      //find the first available price
      int iRow = 0;
      while (iRow < nPrices && NumericVector::is_na(priceColumn[iRow])) {
        iRow++;
      }
      
      //do something with estReturns for the demo
      estReturns(iRow, iCol) = 0.0;
      estReturns(maxRows - 1, iCol) = 0.0;
    }
  }
};

// [[Rcpp::export]]
void demoFunction(List knownPrices, NumericMatrix estReturns) {
  auto maxRows = estReturns.nrow();
  auto maxCols = estReturns.ncol();
  MyWorker workit(knownPrices, estReturns, maxRows);
  parallelFor(0, maxCols, workit);
}


/*** R
prices = runif(10000) |> matrix(nrow = 100)
priceList = lapply(1:100, \(i)prices[,i])
returns = apply(prices, 2, \(x)diff(x)/x[-100])
demoFunction(priceList, returns)
all(c(returns[1,], returns[99,]) == 0)
*/

The above generates a stack imbalance error message if you run it enough times.

I don't think I can iterate of over the contents of knownPrices (as in the gallery examples) because I need to use iCol as an index into estReturns later on.

0

There are 0 best solutions below