How to add one to every element of a SparseVector in Breeze?

274 Views Asked by At

Given a Breeze SparseVector object:

scala>  val sv = new SparseVector[Double](Array(0, 4, 5), Array(1.5, 3.6, 0.4), 8)
sv: breeze.linalg.SparseVector[Double] = SparseVector(8)((0,1.5), (4,3.6), (5,0.4))

What is the best way to take the log of the values + 1?

Here is one way that works:

scala>  new SparseVector(sv.index, log(sv.data.map(_ + 1)), sv.length)
res11: breeze.linalg.SparseVector[Double] = SparseVector(8)((0,0.9162907318741551), (4,1.5260563034950492), (5,0.3364722366212129))

I don't like this because it doesn't really make use of breeze to do the addition. We are using a breeze UFunc to take the log of an Array[Double], but that isn't much. I am concerned that in a distributed application with large SparseVectors, this will be slow.

1

There are 1 best solutions below

0
On

Spark 1.6.3

You can define some UDF's to do arbitrary vectorized addition in Spark. First, you need to set up the ability to convert Spark vectors to Breeze vectors; an example of doing that is here. Once you have the implicit conversions in place, you have a few options.

To add any two columns you can use:

def addVectors(v1Col: String, v2Col: String, outputCol: String): DataFrame => DataFrame = {
      // Error checking column names here
  df: DataFrame => {
    def add(v1: SparkVector, v2: SparkVector): SparkVector =
      (v1.asBreeze + v2.asBreeze).fromBreeze
    val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
    df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
  }
} 

Note, the use of asBreeze and fromBreeze (as well as the alias for SparkVector) is established in the question linked above. A possible solution is to make a literal integer column by

df.withColumn(colName, lit(1))

and then add the columns.

The alternative for more complex mathematical functions is:

def applyMath(func: BreezeVector[Double] => BreezeVector[Double], 
                 inColName: String, outColName: String): DataFrame => DataFrame = {
  df: DataFrame => df.withColumn(outColName, 
    udf((v1: SparkVector) => func(v1.asBreeze).fromBreeze).apply(col(inColName)))
}

You could also make this generic in the Breeze vector parameter.