Exploding array column to fix number of rows (say 5)in pyspark dataframe. Create blank values for items not in array

52 Views Asked by At

I have an array column in pyspark dataframe. It can contain maximum of 14 elements in array which is a struct containing 7 attributes for each 14 elements.If any row has less than 14 elements in the array column we need to create blank values for missing elements and its 7 attributes. How to achieve this ?

**Taking maximum array elements as 5 for example and simplicity: **

Dataframe:

id|as_of_date|attributes
123|2023| [["version3","approved","2.6"],["version2","not approved","5.6"] ]
456|2022| [["version4","approved","6.7"],["version1","approved","8.5"],["version3","not approved","7.5"] ]

Here attributes is array column containing struct (version/approval_status/interest_rate) Maximum 5 versions data can be in array. Row 1 has only 2 versions so we will create 2 rows plus 3 rows for rest of versions with blank data. Row 2 has only 3 versions so we will create 3 rows plus 2 rows for rest of versions with blank data.

Resultant Dataframe:

id|as_of_date|attributes_version|attributes_approval_status|attributes_interest_rate
123|2023|version3|approved|2.6
123|2023|version2|not approved|5.6
123|2023|version1| | 
123|2023|version4| |
123|2023|version5| |
456|2022|version4|approved|6.7
456|2022|version1|approved|8.5
456|2022|version3|not approved|7.5
456|2022|version2| | 
456|2022|version5| |

I am able to create rows using explode , but unable to figure out how to create exactly 5 rows for each version and blank values for missing versions in array

1

There are 1 best solutions below

0
samkart On

you can create a new array column with the required number of array elements, and then explode it.

here's an example

max_size = 5

data_sdf. \
    withColumn('new_attrs', 
               func.when(func.size('attrs') < max_size, 
                         func.concat('attrs', func.expr('array_repeat(array(null, null, null), {0}-size(attrs))'.format(max_size)))).
               otherwise(func.slice('attrs', 1, max_size))
               ). \
    withColumn('new_attrs_explode', func.explode('new_attrs')). \
    selectExpr('id', 'as_of_date', 
               'new_attrs_explode[0] as attributes_version',
               'new_attrs_explode[1] as attributes_approval_status',
               'new_attrs_explode[2] as attributes_interest_rate'
               ). \
    show(truncate=False)

# +---+----------+------------------+--------------------------+------------------------+
# |id |as_of_date|attributes_version|attributes_approval_status|attributes_interest_rate|
# +---+----------+------------------+--------------------------+------------------------+
# |123|2023      |version3          |approved                  |2.6                     |
# |123|2023      |version2          |not approved              |5.6                     |
# |123|2023      |null              |null                      |null                    |
# |123|2023      |null              |null                      |null                    |
# |123|2023      |null              |null                      |null                    |
# |456|2022      |version4          |approved                  |6.7                     |
# |456|2022      |version1          |approved                  |8.5                     |
# |456|2022      |version3          |not approved              |7.5                     |
# |456|2022      |null              |null                      |null                    |
# |456|2022      |null              |null                      |null                    |
# +---+----------+------------------+--------------------------+------------------------+

the new_attrs is the new array column which will have only 5 (as passed in max_size) elements - if attributes are less than the wanted number of elements, it creates null attributes and merges it with the existing. notice that the merge is acheived using concat, not array_union. that's because array_union will remove duplicate elements (attributes) by default. this new column is exploded and used to create new columns.

data_sdf. \
    withColumn('new_attrs', 
               func.when(func.size('attrs') < max_size, 
                         func.concat('attrs', func.expr('array_repeat(array(null, null, null), {0}-size(attrs))'.format(max_size)))).
               otherwise(func.slice('attrs', 1, max_size))
               ). \
    show(truncate=False)

# +---+----------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
# |id |as_of_date|attrs                                                                                |new_attrs                                                                                                                    |
# +---+----------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
# |123|2023      |[[version3, approved, 2.6], [version2, not approved, 5.6]]                           |[[version3, approved, 2.6], [version2, not approved, 5.6], [null, null, null], [null, null, null], [null, null, null]]       |
# |456|2022      |[[version4, approved, 6.7], [version1, approved, 8.5], [version3, not approved, 7.5]]|[[version4, approved, 6.7], [version1, approved, 8.5], [version3, not approved, 7.5], [null, null, null], [null, null, null]]|
# +---+----------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+