I am currently learning about pandas_udf in pyspark. Here is my spark dataframe:
import pyspark
import pandas as pd
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType
spark = pyspark.sql.SparkSession.builder.appName('Tmp').master('local').getOrCreate()
df = spark.createDataFrame([(1, 'Europe', 'France'),
(2, 'Asia', 'China'),
(3, 'Africa', 'Egypt'),
(4, 'Asia', 'India')], ['id', 'Continent', 'Country'])
df.show(truncate=False)
+---+---------+-------+
|id |Continent|Country|
+---+---------+-------+
|1 |Europe |France |
|2 |Asia |China |
|3 |Africa |Egypt |
|4 |Asia |India |
+---+---------+-------+
I created a function named as dummy_func_string which returns a basic pandas series of string values and used that function to create a pandas_udf to add a new column in the dataframe:
def dummy_func_string(x):
return pd.Series(['1', '2', '3', '4'])
string_udf = pandas_udf(dummy_func_string, ArrayType(StringType()))
df.withColumn('UDF_String', string_udf(col('country'))).show()
+---+---------+-------+----------+
| id|Continent|Country|UDF_String|
+---+---------+-------+----------+
| 1| Europe| France| [1]|
| 2| Asia| China| [2]|
| 3| Africa| Egypt| [3]|
| 4| Asia| India| [4]|
+---+---------+-------+----------+
This part above works fine, but when I try to create a pandas_udf using a function that returns integer value pandas series instead of string, I am getting an error:
def dummy_func_integer(x):
return pd.Series([1, 2, 3, 4])
integer_udf = pandas_udf(dummy_func_integer, ArrayType(IntegerType()))
df.withColumn('UDF_Integer', integer_udf(col('country'))).show()
This is the error:
24/02/11 22:29:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
).apply()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
return self.apply_standard()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
mapped = obj._map_values(
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
return lib.map_infer(values, mapper, convert=convert)
File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
assert isinstance(value, Iterable)
AssertionError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
24/02/11 22:29:20 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9) (172.31.5.103 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
).apply()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
return self.apply_standard()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
mapped = obj._map_values(
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
return lib.map_infer(values, mapper, convert=convert)
File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
assert isinstance(value, Iterable)
AssertionError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
24/02/11 22:29:20 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 959, in show
print(self._jdf.showString(n, 20, vertical))
File "/home/skapil/virtualenv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/series.py", line 4904, in apply
).apply()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1427, in apply
return self.apply_standard()
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/apply.py", line 1507, in apply_standard
mapped = obj._map_values(
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
return lib.map_infer(values, mapper, convert=convert)
File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
File "/home/skapil/virtualenv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/types.py", line 814, in convert_array
assert isinstance(value, Iterable)
AssertionError
Can someone please help me understand when the function return string values, the code works fine, but when integer values are returned, there is an error?