I do try to compute .dot product between 2 columns of a give dataframe, SparseVectors has this ability in spark already so I try to execute this in an easy & scalable way without converting to RDDs or to DenseVectors but i'm stuck, spent past 3 days to try find out of an approach and does fail, doesn't return computation for passed 2 vector columns from dataframe and looking for guidance on this matter, please, because something I'm missing here and not sure what is root cause ...

For separate vectors and rdd vectors works this approach but does fail to work when passing dataframe column vectors, to replicate the flow and issues please see below, ideally would be this computation to happen in parallel since real work data is with billions or more rows (dataframe observations):

from pyspark.ml.linalg import Vectors, SparseVector
    from pyspark.sql import Row
    df = spark.createDataFrame(
        [
         [["a","b","c"], SparseVector(4527, {0:0.6363067860791387, 1:1.0888040725098247, 31:4.371858972705023}),SparseVector(4527, {0:0.6363067860791387, 1:2.0888040725098247, 31:4.371858972705023})],
         [["d"], SparseVector(4527, {8: 2.729945780576634}), SparseVector(4527, {8: 4.729945780576634})],
        ], ["word", "i", "j"])

# # daframe content
df.show()
+---------+--------------------+--------------------+
|     word|                   i|                   j|
+---------+--------------------+--------------------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|
+---------+--------------------+--------------------+


@udf(returnType=ArrayType(FloatType()))
def sim_cos(v1, v2):
    if v1 is not None and v2 is not None:
        return float(v1.dot(v2))

# # calling udf
df = df.withColumn("dotP", sim_cos(df.i, df.j))

# # output after udf 
df.show()
+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|      dotP|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...|      null|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...|      null|
+---------+--------------------+--------------------+----------+
1

There are 1 best solutions below

0
On

Rewriting udf as lambda does work on spark 2.4.5. Posting in case anyone is interested in this approach for PySpark dataframes:

# # rewrite udf as lambda function:
sim_cos = F.udf(lambda x,y : float(x.dot(y)), FloatType())

# # executing udf on dataframe
df = df.withColumn("similarity", sim_cos(col("i"),col("j")))

# # end result
df.show()

+---------+--------------------+--------------------+----------+
|     word|                   i|                   j|similarity|
+---------+--------------------+--------------------+----------+
|[a, b, c]|(4527,[0,1,31],[0...|(4527,[0,1,31],[0...| 21.792336|
|      [d]|(4527,[8],[2.7299...|(4527,[8],[4.7299...| 12.912496|
+---------+--------------------+--------------------+----------+