EMR - Spark version upgrade from 2.4.0 to 3.1.2 causes write issues in AWS Open Search (Elasticsearch 6.7)

118 Views Asked by At

I'm currently using EMR release version [emr.5.23.0] i.e. Spark 2.4.0 & Spark Elastic Search Connector [elasticsearch-spark-30_2.12-7.12.0.jar] to write data into AWS Open Search (Elasticsearch 6.7) :

dataframe.coalesce(6).write.mode("append").format(
            "org.elasticsearch.spark.sql"
        ).option(
            "es.nodes.wan.only", "true"
        ).option(
            "es.nodes", es_endpoint
        ).option(
            "es.port", es_port
        ).option(
            "es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
        ).save()

This used to work fine. But when i upgraded the EMR release version from emr.5.23.0 (Spark 2.4.0) to emr-6.4.0 (Spark 3.1.2), I see the following error when trying to write to AWS Open Search (Elasticsearch 6.7) using elasticsearch-spark-30_2.12-7.12.0.jar :-

Failure Reason - An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
    at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more
Traceback (most recent call last):
  File "usr/copy_data_to_open_search.py", line 141, in process_log_data_to_es
    "es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
    at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 40 more

It seems like the error is related to a missing Scala class during the execution of the Spark job that writes to Elasticsearch 6.7 i.e. AWS Open Search in this case. The error indicates that the class scala.Product$class cannot be found.

I tried to update the code with latest Spark Elastic Search Connector jar:

elasticsearch-spark-30_2.12-8.11.2.jar replacing elasticsearch-spark-30_2.12-7.12.0.jar

But the code still fails with the same error mentioned above.

How can I solve this issue ? Thanks in advance!

1

There are 1 best solutions below

0
Robin On

First make sure, you are passing elasticsearch-spark jar :

PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-spark:8.11.2 pyspark-shell

(or)

spark = SparkSession.builder.appName("Load Data into AWS OpenSearch").config("jars", "/spark/jars/elasticsearch-spark-30_2.12-8.11.2.jar").getOrCreate()

These pages helped me a lot:

  1. https://discuss.elastic.co/t/issue-using-the-connector-from-pyspark-in-7-17-3/304084/2
  2. https://discuss.elastic.co/t/python-elasticsearch-and-apache-spark-simple-data-reading/307715

Actual Issue :

The error (java.lang.NoClassDefFoundError: scala/Product$class) usually indicates that its trying to use a package built for an incompatible version of Scala. The error you're encountering, java.lang.NoClassDefFoundError: scala/Product$class, suggests that there's a compatibility issue between the Spark version and the Elasticsearch Spark Connector version you are using. This error often occurs when there is a mismatch between the Scala versions used by Spark and the Elasticsearch Spark Connector.

Here are a few steps you can take to resolve the issue:

Check Scala Versions: Ensure that the Scala versions are compatible between Spark and the Elasticsearch Spark Connector. Spark 3.x typically uses Scala 2.12. Check the version of Scala used by your Spark distribution and make sure the Elasticsearch Spark Connector version you are using is compiled with the same Scala version.

Update Elasticsearch Spark Connector: It seems you've tried updating the Elasticsearch Spark Connector, but make sure you are using a version that is compatible with Spark 3.1.2 and Scala 2.12. You might want to try the latest version of the connector that matches your Spark version.

Solution that worked for me :

I upgraded EMR Release Version from emr-6.4.0 to emr-6.14.0 i.e. Spark 3.4.1 & Hadoop 3.3.3 which was compatible with elasticsearch-spark-30_2.12-8.11.2.jar i.e. Scala 2.12 which solved the issue - (java.lang.NoClassDefFoundError: scala/Product$class)

You can find these jars here : https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.12