I am using cld3 to detect language. I want to apply this to a column of a pyspark dataframe. I am using databricks and this is the code I have right now:

def get_language(text):
  return (cld3.get_language(text)).language
get_language_udf = udf(lambda x: get_language(x))

when I apply the get_language function to the column in pandas format it runs smoothly like so:

pandas_dataframe['language']= pandas_dataframe['text'].apply(lambda x: get_language(x))

however, when I use the utf to create a new column in pyspark I get an error when I run this. here is the code:

pyspark_dataframe = pyspark_dataframe.withColumn("language", (get_language_udf(F.col("text"))))
display(pyspark_dataframe)

I have already installed and imported the cld3 library

any ideas what I can do to fix this? thank you in advance!

PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 425) (100.64.38.7 executor 0): org.apache.spark.api.python.PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3'

During handling of the above exception, another exception occurred:

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:689)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:642)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:758)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:155)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:93)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2920)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2914)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2914)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1334)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1334)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1334)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3123)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3111)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1096)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2494)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:266)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:276)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:81)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
    at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
    at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
    at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:543)
    at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:550)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:486)
    at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:550)
    at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:498)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:497)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:394)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:373)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:408)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3102)
    at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3093)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3900)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:172)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:319)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:146)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:113)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:269)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3898)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3092)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:268)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:102)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:586)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:619)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:493)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:674)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:555)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:634)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:658)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:597)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:222)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:555)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:209)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:549)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:215)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:213)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:210)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:251)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:243)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:526)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:693)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:685)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:526)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:638)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3'

During handling of the above exception, another exception occurred:

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'cld3'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:689)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:642)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:758)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:155)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:93)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
1

There are 1 best solutions below

0
On

Since you're using Databricks, you have to go to Cluster configuration, then install cld3 package there. This will install cld3 on all the workers in the cluster, and you won't get that error anymore.

enter image description here