How to add one to every element of a SparseVector in Breeze?

286 Views Asked by At

Given a Breeze SparseVector object:

scala>  val sv = new SparseVector[Double](Array(0, 4, 5), Array(1.5, 3.6, 0.4), 8)
sv: breeze.linalg.SparseVector[Double] = SparseVector(8)((0,1.5), (4,3.6), (5,0.4))

What is the best way to take the log of the values + 1?

Here is one way that works:

scala>  new SparseVector(sv.index, log(sv.data.map(_ + 1)), sv.length)
res11: breeze.linalg.SparseVector[Double] = SparseVector(8)((0,0.9162907318741551), (4,1.5260563034950492), (5,0.3364722366212129))

I don't like this because it doesn't really make use of breeze to do the addition. We are using a breeze UFunc to take the log of an Array[Double], but that isn't much. I am concerned that in a distributed application with large SparseVectors, this will be slow.

1

There are 1 best solutions below

0
kingledion On

Spark 1.6.3

You can define some UDF's to do arbitrary vectorized addition in Spark. First, you need to set up the ability to convert Spark vectors to Breeze vectors; an example of doing that is here. Once you have the implicit conversions in place, you have a few options.

To add any two columns you can use:

def addVectors(v1Col: String, v2Col: String, outputCol: String): DataFrame => DataFrame = {
      // Error checking column names here
  df: DataFrame => {
    def add(v1: SparkVector, v2: SparkVector): SparkVector =
      (v1.asBreeze + v2.asBreeze).fromBreeze
    val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
    df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
  }
} 

Note, the use of asBreeze and fromBreeze (as well as the alias for SparkVector) is established in the question linked above. A possible solution is to make a literal integer column by

df.withColumn(colName, lit(1))

and then add the columns.

The alternative for more complex mathematical functions is:

def applyMath(func: BreezeVector[Double] => BreezeVector[Double], 
                 inColName: String, outColName: String): DataFrame => DataFrame = {
  df: DataFrame => df.withColumn(outColName, 
    udf((v1: SparkVector) => func(v1.asBreeze).fromBreeze).apply(col(inColName)))
}

You could also make this generic in the Breeze vector parameter.