I'm having problems using udf's from "spark connect"
spark-connect basically works fine (i'm using Jupyter Notebook) however it fails when using udf's.
It seems that it is trying to use local python in order to build the grpc but fails.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
SparkSession.builder.master("local[*]").getOrCreate().stop()
spark = SparkSession.builder.remote("sc://spark-connect-server:15002).getOrCreate()")
# Failing on next line..
spark.range(2).withColumn('plus_one', udf(lambda x: x + 1)('id')).show()
There is a java exception "java.io.IOException: Cannot run program "/root/miniconda3/bin/python": error=13, Permission denied
I checked permissions of /root/miniconda3/bin/python, it is accessible so the problem is something else...
Here the stack-trace:
---------------------------------------------------------------------------
SparkConnectGrpcException Traceback (most recent call last)
Cell In[2], line 2
1 from pyspark.sql.functions import udf
----> 2 spark.range(2).withColumn('plus_one', udf(lambda x: x + 1)('id')).show()
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:817, in DataFrame.show(self, n, truncate, vertical)
816 def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> None:
--> 817 print(self._show_string(n, truncate, vertical))
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:631, in DataFrame._show_string(self, n, truncate, vertical)
622 except ValueError:
623 raise PySparkTypeError(
624 error_class="NOT_BOOL",
625 message_parameters={
(...)
628 },
629 )
--> 631 pdf = DataFrame.withPlan(
632 plan.ShowString(child=self._plan, num_rows=n, truncate=_truncate, vertical=vertical),
633 session=self._session,
634 ).toPandas()
635 assert pdf is not None
636 return pdf["show_string"][0]
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/dataframe.py:1372, in DataFrame.toPandas(self)
1370 raise Exception("Cannot collect on empty session.")
1371 query = self._plan.to_proto(self._session.client)
-> 1372 return self._session.client.to_pandas(query)
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:679, in SparkConnectClient.to_pandas(self, plan)
677 req = self._execute_plan_request_with_metadata()
678 req.plan.CopyFrom(plan)
--> 679 table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req)
680 assert table is not None
681 pdf = table.rename_columns([f"col_{i}" for i in range(len(table.column_names))]).to_pandas()
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:982, in SparkConnectClient._execute_and_fetch(self, req)
979 schema: Optional[StructType] = None
980 properties: Dict[str, Any] = {}
--> 982 for response in self._execute_and_fetch_as_iterator(req):
983 if isinstance(response, StructType):
984 schema = response
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:963, in SparkConnectClient._execute_and_fetch_as_iterator(self, req)
961 yield batch
962 except Exception as error:
--> 963 self._handle_error(error)
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:1055, in SparkConnectClient._handle_error(self, error)
1042 """
1043 Handle errors that occur during RPC calls.
1044
(...)
1052 Throws the appropriate internal Python exception.
1053 """
1054 if isinstance(error, grpc.RpcError):
-> 1055 self._handle_rpc_error(error)
1056 elif isinstance(error, ValueError):
1057 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File ~/miniconda3/lib/python3.8/site-packages/pyspark/sql/connect/client.py:1091, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1089 info = error_details_pb2.ErrorInfo()
1090 d.Unpack(info)
-> 1091 raise convert_exception(info, status.message) from None
1093 raise SparkConnectGrpcException(status.message) from None
1094 else:
SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 24 in stage 110.0 failed 4 times, most recent failure: Lost task 24.3 in stage 110.0 (TID 2421) (10.248.66.39 executor 6): java.io.IOException: Cannot run program "/root/miniconda3/bin/python": error=13, Permission denied
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:222)
at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:134)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anon...
Solved the issue.
I forgot to set the PYSPARK_PYTHON env variable...
After having set it to the location where the executors expect python, spark-connect using udf's now works fine.