build a 2D lookup table from Spark Dataframe

763 Views Asked by At

I would like to convert a smaller dataframe to become a broadcast lookup table to be used inside the UDF of another larger dataframe. This smaller dataframe (myLookupDf) may look like something below:

+---+---+---+---+
| x | 90|100|101|
+---+---+---+---+
| 90|  1|  0|  0|
|100|  0|  1|  1|
|101|  0|  1|  1|
+---+---+---+---+

I want to use the first column as the first key, say x1, and the first row as the second key. x1 and x2 have the same elements. Ideally, the lookup table (myLookupMap) will be a Scala Map (or similar) and work like:

myLookupMap(90)(90) returns 1
myLookupMap(90)(101) returns 0
myLookupMap(100)(90) returns 0
myLookupMap(101)(100) return 1
etc.

So far, I manage to have:

val myLookupMap = myLookupDf.collect().map(r => Map(myLookupDf.columns.zip(r.toSeq):_*))
myLookupMap: Array[scala.collection.Map[String,Any]] = Array(Map(x -> 90, 90 -> 1, 100 -> 0, 101 -> 0), Map(x -> 100, 90 -> 0, 100 -> 1, 101 -> 1), Map(x -> 101, 90 -> 0, 100 -> 1, 101 -> 1))

which is an Array of Map and not exactly what is required. Any suggestions are much appreciated.

1

There are 1 best solutions below

1
On BEST ANSWER

collect() always create rdd which is equivalent to Array. You have to find ways to collect the arrays as maps.

Given the dataframe as

scala> myLookupDf.show(false)
+---+---+---+---+
|x  |90 |100|101|
+---+---+---+---+
|90 |1  |0  |0  |
|100|0  |1  |1  |
|101|0  |1  |1  |
+---+---+---+---+

All you need are the header names other than x so you can do something like below

scala>     val header = myLookupDf.schema.fieldNames.tail
header: Array[String] = Array(90, 100, 101)

I am just modifying your map functions to get Map as the result

scala>     val myLookupMap = myLookupDf.rdd.map(r => {
     |       val row = r.toSeq
     |       (row.head, Map(header.zip(row.tail):_*))
     |     }).collectAsMap()
myLookupMap: scala.collection.Map[Any,scala.collection.immutable.Map[String,Any]] = Map(101 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 100 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 90 -> Map(90 -> 1, 100 -> 0, 101 -> 0))

You should see that you get the desired results.

scala> myLookupMap(90)(90.toString)
res1: Any = 1

scala> myLookupMap(90)(101.toString)
res2: Any = 0

scala> myLookupMap(100)(90.toString)
res3: Any = 0

scala> myLookupMap(101)(100.toString)
res4: Any = 1

Now you can pass the myLookupMap to your udf function