Spark sequences [int] comparison with [String] sequence ouput

1.2k Views Asked by At

I was trying to compare integer wrapped arrays in two different columns and give the ratings as string:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

The DataFrame data has column A and B with wrapped array I would like to compare:

val data = Seq(
    (Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)),
    (Seq(1,1,3),Seq(6,5,7),Seq(11,9,8))
    ).toDF("A","B","C")

And here is how it looks like:

data: org.apache.spark.sql.DataFrame = [A: array<int>, B: array<int> ... 1 more field]
+---------+---------+----------+
|        A|        B|         C|
+---------+---------+----------+
|[1, 2, 3]|[4, 5, 6]| [7, 8, 9]|
|[1, 1, 3]|[6, 5, 7]|[11, 9, 8]|
+---------+---------+----------+

Then here is the the user define function which I would like to compare each elements in paired arrays in column A and B per row and give the ratings with simple logics. For example if A(1) > B(1) then D(1) is "Top". So as first row with column D, I hope to have ["Top", "Top", "Top"]

def myToChar(num1: Seq[Int], num2: Seq[Int]): Seq[String] = {
        val twozipped = num1.zip(num2)
        for ((x,y) <- num1.zip(num2)) {
          if (x > y)  "Top"
          if (x < y)  "Well"
          if (x == y)  "Good"
    }}


val udfToChar = udf(myToChar(_: Seq[Int], _: Seq[Int]))

val ouput = data.withColumn("D",udfToChar($"A",$"B"))

However, I kept getting the <console>:45: error: type mismatch; error information. Not sure if my udf() type definition is wrong and appreciate any guidance to correct my mistake.

1

There are 1 best solutions below

2
On

Your myToChar definition is declared to return a Seq[String] - but its implementation doesn't - it returns Unit, because a for expression (without a yield clause) has Unit type.

You can fix this by fixing the implementation of the function:

  • Replace the for with a map operation
  • Replace the last if with an else, otherwise the mapping function also returns Unit for inputs that adhere to none of the if conditions (unlike with pattern matching, the compiler can't conclude that your if conditions are exhaustive - it must assume there's also a possibility none of them would hold true)

So - a correct implementation would be:

def myToChar(num1: Seq[Int], num2: Seq[Int]): Seq[String] = {
  num1.zip(num2).map { case (x, y) =>
    if (x > y) "Top"
    if (x < y) "Well"
    else "Good"
  }
} 

Or alternatively using pattern matching with guards:

def myToChar(num1: Seq[Int], num2: Seq[Int]): Seq[String] = {
  num1.zip(num2).map {
    case (x, y) if x > y => "Top"
    case (x, y) if x < y => "Well"
    case _ => "Good"
  }
}