I'm currently using EMR release version [emr.5.23.0] i.e. Spark 2.4.0 & Spark Elastic Search Connector [elasticsearch-spark-30_2.12-7.12.0.jar] to write data into AWS Open Search (Elasticsearch 6.7) :
dataframe.coalesce(6).write.mode("append").format(
"org.elasticsearch.spark.sql"
).option(
"es.nodes.wan.only", "true"
).option(
"es.nodes", es_endpoint
).option(
"es.port", es_port
).option(
"es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
).save()
This used to work fine. But when i upgraded the EMR release version from emr.5.23.0 (Spark 2.4.0) to emr-6.4.0 (Spark 3.1.2), I see the following error when trying to write to AWS Open Search (Elasticsearch 6.7) using elasticsearch-spark-30_2.12-7.12.0.jar :-
Failure Reason - An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 40 more
Traceback (most recent call last):
File "usr/copy_data_to_open_search.py", line 141, in process_log_data_to_es
"es.resource", "%s-{%s}/%s" % (log_index_name, "timestamp", "log")
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
self._jwrite.save()
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o331.save.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 40 more
It seems like the error is related to a missing Scala class during the execution of the Spark job that writes to Elasticsearch 6.7 i.e. AWS Open Search in this case. The error indicates that the class scala.Product$class cannot be found.
I tried to update the code with latest Spark Elastic Search Connector jar:
elasticsearch-spark-30_2.12-8.11.2.jar replacing elasticsearch-spark-30_2.12-7.12.0.jar
But the code still fails with the same error mentioned above.
How can I solve this issue ? Thanks in advance!
First make sure, you are passing elasticsearch-spark jar :
(or)
These pages helped me a lot:
Actual Issue :
The error (java.lang.NoClassDefFoundError: scala/Product$class) usually indicates that its trying to use a package built for an incompatible version of Scala. The error you're encountering, java.lang.NoClassDefFoundError: scala/Product$class, suggests that there's a compatibility issue between the Spark version and the Elasticsearch Spark Connector version you are using. This error often occurs when there is a mismatch between the Scala versions used by Spark and the Elasticsearch Spark Connector.
Here are a few steps you can take to resolve the issue:
Check Scala Versions: Ensure that the Scala versions are compatible between Spark and the Elasticsearch Spark Connector. Spark 3.x typically uses Scala 2.12. Check the version of Scala used by your Spark distribution and make sure the Elasticsearch Spark Connector version you are using is compiled with the same Scala version.
Update Elasticsearch Spark Connector: It seems you've tried updating the Elasticsearch Spark Connector, but make sure you are using a version that is compatible with Spark 3.1.2 and Scala 2.12. You might want to try the latest version of the connector that matches your Spark version.
Solution that worked for me :
I upgraded EMR Release Version from emr-6.4.0 to emr-6.14.0 i.e. Spark 3.4.1 & Hadoop 3.3.3 which was compatible with elasticsearch-spark-30_2.12-8.11.2.jar i.e. Scala 2.12 which solved the issue - (java.lang.NoClassDefFoundError: scala/Product$class)
You can find these jars here : https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30_2.12