I use Structured Streaming to read Avro records from a Kafka topic A, do some transformations and write as Avro to another Kafka topic B. I use those functions for serializing and deserializing the Avro records.
I faced another exception (parsing error with method "FAILFAST") originally while reading from the second topic B with the specified schema in a separate job. I wondered how this could happen because the schemas were exactly the same.
To discover more details, I came up with the following experimental setup to reproduce the exception. I found out that the serialized Avro records (with the exact same schema, just some easy transformations) can't be serialized with to_avro
:
from pyspark.sql.types import StringType, Row, StructField, StructType, LongType, StringType, ArrayType
from pyspark.sql import functions as func
from pyspark.sql.avro import functions as afunc
avro_schema = """
{
"fields": [
{
"name": "field_1",
"type": [
"string",
"null"
]
},
{
"name": "field_2",
"type": [
"long",
"null"
]
},
{
"name": "field_3",
"type": [
{
"items": [
"string",
"null"
],
"type": "array"
},
"null"
]
}
],
"name": "bla",
"namespace": "bla",
"type": "record"
}
"""
# Create example DF
df = spark.createDataFrame(
[Row("bla", 42, ["bla", "blubb"])],
StructType(
[
StructField("field_1", StringType(), True),
StructField("field_2", LongType(), True),
StructField("field_3", ArrayType(StringType()), True),
]
),
)
# Serialize columns to Avro record
df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema))
# Now, change values in DF to some arbitrary value
df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211))
df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce nullability in schema for field_2
df_changed = df_changed_1.unionByName(df_changed_2)
# Again, serialize columns to Avro record with same schema as before
df_changed_avro = df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema))
# Schemas are exactly identical, which makes sense because nothing changed in the schema itself, only the values
df.schema
df_changed.schema
# Have a look at the DFs values
df.show()
df_changed.show()
# Enforce DF evaulation
df_avro.show() # This works as expected
df_changed_avro.show() # This produced the exception
The last line produces the following exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 (TID 109, 192.168.2.117, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"].
at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"].
at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
According to the documentation, Spark SQL types can not be converted to Avro union types (at least it's not explicitly written down here), but I wonder why it seem to work for the original Dataframe.
Am I missing something or is this a Bug? This also happens when I for example add a new field to the original Dataframe (df) and try to serialize it with an enhanced schema with the new field added.