Pyspark "An error occurred while calling o255.showString"

595 Views Asked by At

I am trying to find similarity between two texts by comparing them. For this, I can calculate the tf-idf values of both texts and get them as RDD correctly. However, when I try to see the cosine similarity between them after I calculate it and print it to the dataframe, I get an error. Here is my code:

conf = ps.SparkConf().setAll([("spark.memory.offHeap.size", "10g"),
                              ("spark.memory.offHeap.enabled", "true"),
                              ("spark.driver.maxResultSize", "10g")])

sc = ps.SparkContext("local[8]", conf=conf)
sqlContext = SQLContext(sc)
print("Just created a SparkContext")

df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('bbcclear.csv')
df2 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('yenisafakcategorypredict.csv')

df1 = df1.select(col("_c0"), col("News Data"))

regex_tokenizer = RegexTokenizer(gaps=False, pattern="\w+", inputCol="News Data", outputCol="tokens")
count_vectorizer = CountVectorizer(inputCol="tokens", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="idf")
tf_idf_pipeline = Pipeline(stages=[regex_tokenizer, count_vectorizer, idf])
df1 = tf_idf_pipeline.fit(df2).transform(df1).drop("News Data", "tokens", "tf")
df1 = df1.crossJoin(df1.withColumnRenamed("idf", "idf2"))

columns = ["YS_ID","YS_IDF","BBC_ID","BBC_IDF"]
df1 = df1.toDF(*columns)

print(df1.show())

from pyspark.sql.functions import monotonically_increasing_id
df1 = df1.withColumn("ID", monotonically_increasing_id())
df1 = df1.select(col("ID"),col("YS_ID"),col("YS_IDF"),col("BBC_IDF"))
df1 = df1.dropna()
print(df1.show())

@F.udf(returnType=FloatType())
def cos_sim(u, v):
  return float(u.dot(v) / (u.norm(2) * v.norm(2)))

df1 = df1.withColumn("cos_sim", cos_sim(F.col("YS_IDF"), F.col("BBC_IDF")))
df1.printSchema()
df1 = df1.dropna()
print(df1.show())

For example, the final dataframe before the error.

+---+-----+--------------------+--------------------+
| ID|YS_ID|              YS_IDF|             BBC_IDF|
+---+-----+--------------------+--------------------+
|  0|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|
|  1|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|
|  2|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|
|  3|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|
|  4|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|
|  5|    0|(8278,[0,1,2,3,4,...|(8278,[0,1,2,3,4,...|

I can see the result of the function called printSchema correctly.

root
 |-- ID: long (nullable = false)
 |-- YS_ID: integer (nullable = true)
 |-- YS_IDF: vector (nullable = true)
 |-- BBC_IDF: vector (nullable = true)
 |-- cos_sim: float (nullable = true)

When I try to see the function with the "show" function, I get the error I mentioned.

All Traceback:

Traceback (most recent call last):
  File "C:\Users\Onee-Sama\PycharmProjects\pythonProject23\main.py", line 50, in <module>
    print(df1.show())
  File "C:\Users\Onee-Sama\Desktop\pythonProject23\lib\site-packages\pyspark\sql\dataframe.py", line 606, in show
    print(self._jdf.showString(n, 20, vertical))
  File "C:\Users\Onee-Sama\Desktop\pythonProject23\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\Onee-Sama\Desktop\pythonProject23\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\Onee-Sama\Desktop\pythonProject23\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o256.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 20) (DESKTOP-PBJ7RJA executor driver): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:340)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:368)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:340)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)
0

There are 0 best solutions below