Calculate square root of covariance matrix in Scala

43 Views Asked by At

I have a training dataset with some feature columns. I'm converting this dataset into a StandardScaler dataset and using that to generate a covariance matrix, and then calculating the square root of this matrix. I'm using breeze.linalg for generating the cov matrix. Here's my code

def generateMetrics(trainingData: DataFrame, featureColumnNames: Array[String]): (DataFrame, List[Array[Double]]) = {
    
    val scaledFeatureVecCol = "scaledFeatures"
    println("Training data total count - " + trainingData.count())
    val standardScalerDf = calculateStandardScalar(trainingData, featureColumnNames, scaledFeatureVecCol)
    println("Standard scaler output - " + standardScalerDf.count())
    
    val covarianceMatrixSqrt = calculateCovarianceMatrixSqrt(standardScalerDf, scaledFeatureVecCol)

    (standardScalerDf, covarianceMatrixSqrt)
}

def calculateStandardScalar(
    featuresDf: DataFrame,
    featureColumnNames: Array[String],
    outputColumn: String): DataFrame = {

  val vectorColumnName = "featureVectors"
  val assembler = new VectorAssembler()
    .setInputCols(featureColumnNames)
    .setOutputCol(vectorColumnName)

  val assembledData = assembler.transform(featuresDf)

  // calculate standard scalar
  val scaler = new StandardScaler()
    .setInputCol(vectorColumnName)
    .setOutputCol(outputColumn)
    .setWithMean(true)
    .setWithStd(true)

  val scalerModel = scaler.fit(assembledData)
  scalerModel.transform(assembledData)
}

def calculateCovarianceMatrixSqrt(featuresDf: DataFrame, featureVecColumn: String): List[Array[Double]] = {

  // Extract standardized features
  val scaledFeatures = featuresDf.select(featureVecColumn).rdd.map(row => row.getAs[Vector](0).toArray).collect()
  
  // Convert scaled features to Breeze DenseMatrix
  val breezeScaledFeatures =
    new DenseMatrix(rows = scaledFeatures.length, cols = scaledFeatures.head.size, data = scaledFeatures.flatten)

  // Calculate covariance matrix
  println("Calculate covariance matrix")
  val covMatrixDense: DenseMatrix[Double] = cov(breezeScaledFeatures.t)

  val svd.SVD(u, s, vt) = svd(covMatrixDense)

  // Calculate the square root of the singular values
  val sqrtSingularValues = diag(s.map(value => math.sqrt(math.abs(value))))

  // Reconstruct the square root matrix using U, S, and Vt
  val sqrtMatrix: DenseMatrix[Double] = u * sqrtSingularValues * vt

  // Convert Breeze DenseMatrix back to Spark Matrix
  val sqrtCovMatrix: Matrix = Matrices.dense(sqrtMatrix.rows, sqrtMatrix.cols, sqrtMatrix.toArray)

  // convert to list of array for easy IO
  sqrtCovMatrix.transpose // Transpose since .toArray is column major
    .toArray
    .grouped(sqrtCovMatrix.numCols)
    .toList
}

I'm getting a weird error while trying to convert the 2D matrix to breeze DenseMatrix.

ERROR  Client: Application diagnostics message: User class threw exception: java.lang.IndexOutOfBoundsException: Storage array has size 694391433 but indices can grow as large as 761101526

I have verified that the 2D array has arround 300k rows and 251 columns. Can someone please help me figure out what I'm doing wrong?

0

There are 0 best solutions below