Spark connect fails when using udf

660 Views Asked by At

I'm having problems using udf's from "spark connect"
spark-connect basically works fine (i'm using Jupyter Notebook) however it fails when using udf's. It seems that it is trying to use local python in order to build the grpc but fails.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

SparkSession.builder.master("local[*]").getOrCreate().stop()

spark = SparkSession.builder.remote("sc://spark-connect-server:15002).getOrCreate()")

# Failing on next line..
spark.range(2).withColumn('plus_one', udf(lambda x: x + 1)('id')).show()

There is a java exception "java.io.IOException: Cannot run program "/root/miniconda3/bin/python": error=13, Permission denied

I checked permissions of /root/miniconda3/bin/python, it is accessible so the problem is something else...

Here the stack-trace:

---------------------------------------------------------------------------
SparkConnectGrpcException                 Traceback (most recent call last)
Cell In[2], line 2
      1 from pyspark.sql.functions import udf
----> 2 spark.range(2).withColumn('plus_one', udf(lambda x: x + 1)('id')).show()

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:817, in DataFrame.show(self, n, truncate, vertical)
    816 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
--> 817     print(self._show_string(n, truncate, vertical))

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:631, in DataFrame._show_string(self, n, truncate, vertical)
    622     except ValueError:
    623         raise PySparkTypeError(
    624             error_class="NOT_BOOL",
    625             message_parameters={
   (...)
    628             },
    629         )
--> 631 pdf = DataFrame.withPlan(
    632     plan.ShowString(child=self._plan, num_rows=n, truncate=_truncate, vertical=vertical),
    633     session=self._session,
    634 ).toPandas()
    635 assert pdf is not None
    636 return pdf["show_string"][0]

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:1372, in DataFrame.toPandas(self)
   1370     raise Exception("Cannot collect on empty session.")
   1371 query = self._plan.to_proto(self._session.client)
-> 1372 return self._session.client.to_pandas(query)

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:679, in SparkConnectClient.to_pandas(self, plan)
    677 req = self._execute_plan_request_with_metadata()
    678 req.plan.CopyFrom(plan)
--> 679 table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req)
    680 assert table is not None
    681 pdf = table.rename_columns([f"col_{i}" for i in range(len(table.column_names))]).to_pandas()

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:982, in SparkConnectClient._execute_and_fetch(self, req)
    979 schema: Optional[StructType] = None
    980 properties: Dict[str, Any] = {}
--> 982 for response in self._execute_and_fetch_as_iterator(req):
    983     if isinstance(response, StructType):
    984         schema = response

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:963, in SparkConnectClient._execute_and_fetch_as_iterator(self, req)
    961                             yield batch
    962 except Exception as error:
--> 963     self._handle_error(error)

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:1055, in SparkConnectClient._handle_error(self, error)
   1042 """
   1043 Handle errors that occur during RPC calls.
   1044 
   (...)
   1052 Throws the appropriate internal Python exception.
   1053 """
   1054 if isinstance(error, grpc.RpcError):
-> 1055     self._handle_rpc_error(error)
   1056 elif isinstance(error, ValueError):
   1057     if "Cannot invoke RPC" in str(error) and "closed" in str(error):

File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:1091, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1089             info = error_details_pb2.ErrorInfo()
   1090             d.Unpack(info)
-> 1091             raise convert_exception(info, status.message) from None
   1093     raise SparkConnectGrpcException(status.message) from None
   1094 else:

SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 24 in stage 110.0 failed 4 times, most recent failure: Lost task 24.3 in stage 110.0 (TID 2421) (10.248.66.39 executor 6): java.io.IOException: Cannot run program "/root/miniconda3/bin/python": error=13, Permission denied
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:222)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
    at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anon...

1

There are 1 best solutions below

0
On

Solved the issue.

I forgot to set the PYSPARK_PYTHON env variable...

After having set it to the location where the executors expect python, spark-connect using udf's now works fine.