Applying paired Euclidean Distance between all columns between two matrices for Big Data

142 Views Asked by At

My problem is applying Euclidean Distance between paired columns of two very large matrices i.e. (only x[,1] with y[,1],..., x[,n] with y[,n]). Thats pretty simple but I need to do it for very large files. Setting it up for parallelization the normal way with parLapply takes a very long time for the x,y to be fed into clusterExport. I tried using the bigmemory package but I keep getting error reports (The file PRED.desc could not be found). To further increase speed, I tried processing them in chunks.

library(parallel)
library(doParallel)
library(bigmemory) 

### Fake Random Data ##
PRED<-matrix(rnorm(10**9,0,1),nrow=1000)
ACTUAL<-matrix(rnorm(10**9,0,1),nrow=1000)
Names<-paste0("Stock_Number_",1:10**6)
###

big_PRED<-bigmemory::as.big.matrix(PRED,type="double", descriptorfile="PRED.desc")
big_ACTUAL<-bigmemory::as.big.matrix(ACTUAL,type="double", descriptorfile="ACTUAL.desc")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){PRED=attach.big.matrix("PRED.desc")
                              ACTUAL=attach.big.matrix("ACTUAL.desc")
                              VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],Actual[,yi]))}) 
                              return(VEC(xi,yi))
}

clusterExport(cl, varlist=c("DistFunction","inds","attach.big.matrix"))
clusterEvalQ(cl, library(bigmemory))
parEucDist<-function(clVAR){parallel::parSapply(clVAR,seq_along(inds), function(UU){Index=inds[[UU]];return(DistFunction(Index,Index))})}
full_EDist<-parEucDist(clVAR=cl)
parallel::stopCluster(cl)

I also tried the following but they both gave me Error in serialize(data, node$con) : error writing to connection

#### bigmemory ###
NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); 
doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],Actual[,yi]))}); return(VEC(xi,yi))}
distVEC=matrix(NA,ncol=length(seq_along(inds)),nrow=length(inds[[1]]))
clusterExport(cl, varlist=c("DistFunction","inds","attach.big.matrix"))
clusterEvalQ(cl, library(bigmemory))
full_EDist<-foreach(i=seq_along(inds), .combine=c) 
                %dopar% { 
                     Index=inds[[i]]
                     PRED=attach.big.matrix("PRED.desc")
                     ACTUAL=attach.big.matrix("ACTUAL.desc")
                     return(DistFunction(Index,Index))}
parallel::stopCluster(cl)

#### bigstatsr ###
require(bigstatsr)
FBM_PRED<-bigstatsr::as_FBM(PRED,type="double")
FBM_ACTUAL<-bigstatsr::as_FBM(ACTUAL,type="double")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores)
doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi){VEC<-Vectorize(function(xi,yi) {dist(rbind(PRED[,xi],Actual[,yi]))}); return(VEC(xi,yi))}
distVEC=matrix(NA,ncol=length(seq_along(inds)),nrow=length(inds[[1]]))
clusterExport(cl, varlist=c("DistFunction","inds"))
clusterEvalQ(cl, library(bigstatsr))
full_EDist<-foreach(i=seq_along(inds), .combine=c) 
             %dopar% { Index=inds[[i]]
                       PRED=FBM_PRED[,Index]
                       ACTUAL=FBM_ACTUAL[,Index]
                       distVEC[,i]<-DistFunction(Index,Index)}
parallel::stopCluster(cl)
1

There are 1 best solutions below

0
EM1144 On

Nevermind. I got it to work.

### Fake Random Data ##
PRED<-matrix(rnorm(10**9,0,1),nrow=1000)
ACTUAL<-matrix(rnorm(10**9,0,1),nrow=1000)
Names<-paste0("Stock_Number_",1:10**6)
###


require(bigstatsr)
FBM_PRED<-bigstatsr::as_FBM(PRED,type="double")
FBM_ACTUAL<-bigstatsr::as_FBM(ACTUAL,type="double")

NUMCores<-parallel::detectCores()-1
cl <- parallel::makePSOCKcluster(NUMCores); doParallel::registerDoParallel(cl)
L<- ncol(PRED); inds <- split(seq_len(L), sort(rep_len(seq_len(NUMCores), L)))
DistFunction<-function(xi,yi,PRED,ACTUAL){VEC<-Vectorize(function(xi,yi){dist(rbind(PRED[,xi],ACTUAL[,yi]))}); return(VEC(xi,yi))}
clusterExport(cl, varlist=c("DistFunction","inds","FBM_PRED","FBM_ACTUAL"))
clusterEvalQ(cl, library(bigstatsr))
parEucDist<-function(clVAR){parallel::parSapply(clVAR,seq_along(inds), 
                               function(UU){Index=inds[[UU]]
                                            return(DistFunction(Index,Index,FBM_PRED,FBM_ACTUAL))})}
full_EDist<-parEucDist(clVAR=cl)
full_EDist<-do.call(c,full_EDist)
parallel::stopCluster(cl)