Convert Spark DataFrame to spark.rdd.RDD[(Array[Integer], Array[Integer]) to calculate mean average precision

262 Views Asked by At

I have a Spark DataFrame:

DataFrame

I have to use Spark with Scala to calculate mean average precision from RankingMetrics. I guess according to the documentation we have to use RDD instead of DataFrame. I tried the following:

var llist = df.select("predicted", "actual").rdd.map(x => (x.get(0), x.get(1))).collect()
// It gave Array[(Any, Any)]

var df_rdd =sc.parallelize(llist)
// df_rdd is org.apache.spark.rdd.RDD[(Any, Any)]

val metrics = new RankingMetrics(df_rdd)
// This gave me an error

Error :

error: type mismatch;
found : org.apache.spark.rdd.RDD[(Any, Any)]
required: org.apache.spark.rdd.RDD[(Array[?], Array[?])]
Note: (Any, Any) >: (Array[?], Array[?]), but class RDD is invariant in type T.
You may wish to define T as -T instead. (SLS 4.5)

I am using Spark version 2.4.3

How can I convert this DataFrame to that format so I can calculate mean average precision? Thanks.

2

There are 2 best solutions below

0
On

A DataFrame is essentially an RDD under the covers and your DataFrame has the type DataFrame[Array[Int], Array[Int]]. So based on your types you are reporting, you should be able pass that directly to the new RankingMetrics(df.rdd()). Not tested.

0
On

As the error simply tells that your params to RankingMetrics should be of type

(Array[?], Array[?])

But when you will check your RDD which you can by simply typing df.rdd, it shows this:

org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

So basically you have to convert this Row type of data to Array type. One way to do this is mapping it like this:

df.rdd.map(r=>(Array(r(0)),Array(r(1))))

This is not a suggested way. Also in your use case, don't start with creating a dataframe instead create an RDD with data of required type (in your case (Array[], Array[])). Also, to create an RDD from a dataframe you should use:

df.rdd