PySpark Deciling UDF Not Giving Output & Taking Lot of time to Run

33 Views Asked by At

So basically I have created a PySpark function named segmentation that performs cumulative sum calculation, handles outliers, finds the maximum cumulative sum, calculates a decile, and updates a column based on specific conditions. Here is how it should work -

1.Create Window Specification: Define a window specification for ordering based on specified columns in descending order. Calculate Cumulative Sum:

2.Add a new column (CUM_SUM) to the DataFrame, representing the cumulative sum of a specified column over the defined window.

3.Handle Outliers (Conditional Update): If the provided outlier value is not equal to 1000000: Subtract the sum of (col_nm) values greater than or equal to the specified outlier from the CUM_SUM column.

4.Calculate Maximum Cumulative Sum: Find the maximum value of the CUM_SUM column across the entire DataFrame.

5.Calculate Decile: Add a new column (col_name2) to the DataFrame, calculating the decile based on a specific formula.

6.Update Values: Modify values in the col_name2 column, setting values equal to 11 to 10. Return the Modified DataFrame:

Return the DataFrame with all the calculated and updated columns.

Here is my code-

from pyspark.sql import Window
from pyspark.sql.functions import col, sum, when, ceil
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def segmentation(df, col_name, col_name2, row1, row2, row3, row4, row5, row6, outlier):
    window_spec = Window.orderBy(
        col(row1).desc(),
        col(row2).desc(),
        col(row3).desc(),
        col(row4).desc(),
        col(row5).desc(),
        col(row6).desc()
    )

    # Calculate cumulative sum using a Window function
    df = df.withColumn("CUM_SUM", sum(col(col_name)).over(window_spec))

    # Check if outlier is not equal to 1000000 and perform the conditional update
    if outlier != 1000000:
        df = df.withColumn(
            "CUM_SUM",
            col("CUM_SUM") - sum(
                when(col(col_name) >= outlier, col(col_name))
            ).over(window_spec) 
        )

    # Find the maximum cumulative sum
    max_cum_sum = df.agg({"CUM_SUM": "max"}).collect()[0][0]

    # Calculate the decile and set the values in the column col_name2
    df = df.withColumn(
        col_name2,
        when(col(col_name) > 0, (11 - ceil(10 * col("CUM_SUM") / max_cum_sum))).otherwise(0)
    )
    
    # Set 'COL_NM2' to 10 where it's equal to 11
    df = df.withColumn(col_name2, when(col(col_name2) == 11, 10).otherwise(col(col_name2)))

    return df

So since I am runnning this code in Jupyter,it is giving me a warning a lot of times that :- WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. Now because of this one of the function call is working, but when I call the next function call it gives error - Connection Refused or just keeps on runnning forever.

I understand this has to do with partitions but since I am new to spark not really able to figure out how to tweak the code for errorless and faster execution. Really appreacite everyone's help on this.

Thank You !

0

There are 0 best solutions below