ModuleNotFoundError when using a static-method based pyspark udf

34 Views Asked by At

Our codebase includes a structure like the following:

  • a UDF defined as a static method referencing another static method
  • another method in the same class that references the UDF:
class DataFrameUtils:

    @staticmethod
    def subtract_microsecond(datetime_to_convert) -> str or None:
        if datetime_to_convert is None:
            return datetime_to_convert
        return (datetime_to_convert - timedelta(microseconds=1)).strftime(
            "%Y-%m-%d %H:%M:%S.%f%Z"
        )

    @staticmethod
    @udf
    def subtract_microsecond_udf(datetime_to_convert) -> str or None:
        return DataFrameUtils.subtract_microsecond(datetime_to_convert)

    def compute_something():

        window_group = Window.partitionBy(*primary_key_list).orderBy(order_by_col)
        return data_frame.withColumn(
            "computed_column",
            DataFrameUtils.subtract_microsecond_udf(lead(order_by_col).over(window_group)),
        )

This structure was working for over a year, but after what seems to be completely unrelated changes it stopped working and now CloudPickle gives a ModuleNotFoundError on the package/module containing this class.

converted = PythonException('\n  An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (mos...1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t... 1 more\n')

    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
>               raise converted from None
E               pyspark.sql.utils.PythonException: 
E                 An exception was thrown from the Python worker. Please see the stack trace below.
E               Traceback (most recent call last):
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 670, in main
E                   func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 507, in read_udfs
E                   udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 289, in read_single_udf
E                   f, return_type = read_command(pickleSer, infile)
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
E                   command = serializer._read_with_length(file)
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
E                   return self.loads(obj)
E                 File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
E                   return cloudpickle.loads(obj, encoding=encoding)
E               ModuleNotFoundError: No module named 'ourproject.utils.dataframe_utils'

Now I've seen cloudpickle/serialization errors in the past for example when accessing Accumulators within a UDF. Here I'm not sure what is the root problem and what is a clean fix. In the short term I have made a copy of the method and the udf within the client method. This does work but means a copy is needed every place it is used - the opposite of DRY.

    @staticmethod
    def subtract_microsecond(datetime_to_convert) -> str or None:
        if datetime_to_convert is None:
            return datetime_to_convert
        return (datetime_to_convert - timedelta(microseconds=1)).strftime(
            "%Y-%m-%d %H:%M:%S.%f%Z"
        )

    @staticmethod
    @udf
    def subtract_microsecond_udf(datetime_to_convert) -> str or None:
        return DataFrameUtils.subtract_microsecond(datetime_to_convert)

    def compute_something():

        window_group = Window.partitionBy(*primary_key_list).orderBy(order_by_col)
        # Reference the local copy of `subtract_microsecond_udf`
        return data_frame.withColumn(
            "computed_column",
            subtract_microsecond_udf(lead(order_by_col).over(window_group)),
        )

Any pointers on how to reliably share UDF's across different methods would be appreciated.

0

There are 0 best solutions below