Optimized way to apply transformation on several columns of a Spark DataFrame

1.3k Views Asked by At

In my spark jobs, I have to make transformations on multiple column for 2 use cases :

  • Casting columns

In my personal use case, i use it on a Df of 150 columns

  def castColumns(inputDf: DataFrame, columnsDefs: Array[(String, DataType)]): DataFrame = {
    columnsDefs.foldLeft(inputDf) {
      (acc, col) => acc.withColumn(col._1, inputDf(col._1).cast(col._2))
    }
  }
  • Transformation

In my personal use case, i use it to perform calculation n multiple column to create n new columns (1 input col for 1 output col, n times)

    ListOfCol.foldLeft(dataFrame) {
      (tmpDf, m) => 
          tmpDf.withColumn(addSuffixToCol(m), UDF(m))
    }

As you saw, I use FoldLeft method and withColumn. But i found out recently in the documentation that using withColumn is not that good when used multiple times :

this method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select with the multiple columns at once.

I also found that foldleft slowdown spark application because a full plan analysis is performed on every iteration. i think this is true beacause since i added foldleft in my code, my spark take more time to start a job than before.

Is there good practice when applying transformations on multiple columns ?

Spark version : 2.2 Language : Scala

1

There are 1 best solutions below

8
Jarrod Baker On

In the case of casting you can achieve what you're looking for with something like the following:

val df: DataFrame = ???
val cols = Array(("a", StringType), ("b", BooleanType), ("c", LongType)).map(c => col(c._1).cast(c._2))
val renamed = df.select(cols:_*)

It uses the method select(cols: Column*): DataFrame (Spark 2.2 docs) which takes a collection of Columns. The map on the variable cols creates the column expressions.

In the case of the transformations, it's not entirely clear to me what your doing, but a similar logic can be applied. I've made some best guesses regarding type signatures from your example:

def addSuffixToCol(c: Column): String = ???
def UDF(c: Column): Column = ???
val ListOfCol: List[Column] = ???
val dataFrame: DataFrame = ???
dataFrame.select(ListOfCol.map(c => UDF(c).as(addSuffixToCol(c))):_*)

As above, we apply the transformations on the columns in ListOfCol which are then used to select from dataFrame.

If you want to include other columns, add them to the select statement, eg:

dataFrame.select(col("foo"), col("bar"), ListOfCol.map(c => UDF(c).as(addSuffixToCol(c))):_*)