Facing Exception while using a UDF on Pyspark Dataframe: Could not serialize object

14 Views Asked by At

I'm trying to use a UDF function to accept certain fields from my PySpark dataframe and return a Struct type. The idea is to store this Struct as a separate column for further processing.

The Function:

def get_category_details_udf(id, category_list, sub_type):
    selected_categories = ((df_registry_checklist.filter(col("registry_id") == id).select("checklist_id")).agg(collect_list("checklist_id")).head()[0])
    total_subcategory_items = category_list + selected_categories
    if sub_type == "BABY":
        completed_category = baby_checklist_template.filter(baby_checklist_template["subcategory_child_ids"].isin(total_subcategory_items))
        completed_percentage = round((completed_category.select("checklist_id").distinct().count() / total_baby_checklist_count) * 100)
        incomplete_checklist_df = baby_checklist_template.join(completed_category,baby_checklist_template["checklist_id"] == completed_category["checklist_id"], "left_anti")
        checklist_suggestion = incomplete_checklist_df.select("category_id").distinct().limit(5).agg(collect_list("category_id")).head()[0]
    else:
        completed_category = wedding_checklist_template.filter(wedding_checklist_template["subcategory_child_ids"].isin(total_subcategory_items))
        completed_percentage = round((completed_category.select("checklist_id").distinct().count() / total_wedding_checklist_count) * 100)
        incomplete_checklist_df = wedding_checklist_template.join(completed_category,wedding_checklist_template["checklist_id"] == completed_category["checklist_id"], "left_anti")
        checklist_suggestion = incomplete_checklist_df.select("category_id").distinct().limit(5).agg(collect_list("category_id")).head()[0]
    return struct(completed_percentage, checklist_suggestion)

Some of the dataframes I've used here is defined in the main function and I expect them to serve as global variables.

Setting the UDF Function and it's invocation:

get_category_details_udf_schema = StructType([
        StructField("completed_percentage", IntegerType()),
        StructField("checklist_suggestion", ArrayType(StringType()))
    ])
get_category_details = udf(get_category_details_udf, returnType=get_category_details_udf_schema)
dataframe = dataframe.withColumn("udf_result", get_category_details(dataframe["id"], dataframe["category_id_list"], dataframe["subtype"]))

The Error:

_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o89.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

I don't know what's going wrong. The only thing left for me to try is using Python Version 3.8 or higher. I'm currently using 3.6.x

Things I tried to debug:

  • I tried using only pypark.sql functions within the UDF.
  • I removed earlier flatMap and RDD operations within the UDF.
  • I tried passing the global variables as parameters to the UDF function.
0

There are 0 best solutions below