How to improve spark filter() performance on an array of struct?

317 Views Asked by At

I am working on a spark project and have some performance issue that I am struggling with, any help will be appreciated.

I have a column Collection that is an array of struct:

root

|-- Collection: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Key: string (nullable = true)
|    |    |-- Value: string (nullable = true)

The goal is to transform this column to struct type based on a list of available Keys:

|-- Collection: struct (nullable = true)
|    |-- Key1: string (nullable = true)
|    |-- Key2: array[string](nullable = true)
|    |-- Key3: string (nullable = true)

I have the list of available keys as below:

{
  "type" : "struct",
  "fields" : [ {
    "name" : "Key1",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "Key2",
    "type" : {
      "type" : "array",
      "elementType" : "string",
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "Key3",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }]
}

The reason that the Value Type can be string or array[string] is that in the original column, the array can contains multiple struct with the same key, in which case we inferred them as array[string] type. For example we want to transform an array of struct like this:

[{key1, value1}, {key2, value2}, {key2, value3}, {key3, value4}]

to this:

{value1, [value2, value3], value4}

I am currently able to do the job by using this function:

  def function(column: Column, columnSchema: StructType): Column = {
    var newColumn = struct()
    for(x <- columnSchema.fields.toList) {
      x.dataType match {
        case ArrayType(StringType, true) => newColumn = newColumn.withField(x.name, transform(filter(column, e => e("Key") === x.name), e => e("Value")))
        case StringType => newColumn = newColumn.withField(x.name, element_at(filter(column, e => e("Key") === x.name), 1).getField("Value"))
      }
    }
    newColumn
  }

In reality I have 30 available keys, which means I will need the new struct column with 30 struct fields. I find it to be very slow because the filter() method here have to be run on each row for 30 times to retrieve all the key value pairs.

Is there a better way to improve this? Many thanks!

1

There are 1 best solutions below

0
On

Not a complete answer, but hopefully helpful. Instead of converting the column directly in a UDF, I'd suggest exploding the array, breaking up the struct into key-value columns, grouping by key to collect values with the same key, then creating your new struct. I constructed an example like this:

val data = Seq(
    Row(List(Row("k1", "v1"), Row("k2", "v2"), Row("k2", "v3"), Row("k4", "v4"))), 
    Row(List(Row("k1", "v1"), Row("k1", "v2")))
)

val schema = new StructType()
  .add("arr", ArrayType(new StructType()
    .add("key", StringType)
    .add("value", StringType)))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.show(false)

// output

+----------------------------------------+
|arr                                     |
+----------------------------------------+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|
|[{k1, v1}, {k1, v2}]                    |
+----------------------------------------+

Explode the array to produce one key-value pair per row and break key and value into their own columns:

val exploded = df.withColumn("kv", explode(col("arr"))).select("arr", "kv.key", "kv.value")

exploded.show(false)

// output

+----------------------------------------+---+-----+
|arr                                     |key|value|
+----------------------------------------+---+-----+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k1 |v1   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |v2   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |v3   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k4 |v4   |
|[{k1, v1}, {k1, v2}]                    |k1 |v1   |
|[{k1, v1}, {k1, v2}]                    |k1 |v2   |
+----------------------------------------+---+-----+

Group by the key and collect the values:

val collected = exploded.groupBy("arr", "key").agg(collect_list("value").as("values"))
collected.show(false)

// output

+----------------------------------------+---+--------+
|arr                                     |key|values  |
+----------------------------------------+---+--------+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k1 |[v1]    |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |[v2, v3]|
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k4 |[v4]    |
|[{k1, v1}, {k1, v2}]                    |k1 |[v1, v2]|
+----------------------------------------+---+--------+

You might also want to include first("value") in the agg so that you'd have access to singular values as a string instead of an array. At this point, you can group by the original array and construct the final struct.