Flink throwing com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

5.9k Views Asked by At

I am trying to deserialize kafka events in my flink stream job. This is my code:

...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))

And the job throwing this exception during the runtime:

...
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 26 more

Process finished with exit code 1

I read that I should not use Kryo but I have no idea how to do it. I tried:

executionConfig.enableForceAvro()
executionConfig.disableForceKryo()

but it doesn't help.

4

There are 4 best solutions below

0
On

The mentioned exception is related to the issue with the scala implementation of the avro deserialization. It works fine if I use the java implementation (https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro). My solution:

val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
    kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
    new GenericRecordAvroTypeInfo(schema))
val stream = new DataStream[GenericRecord](javaStream)
0
On

I also came across same issue in java , Below code snippet help me to resolve issue

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
environment.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

Also you need to add maven dependency to resolve UnmodifiableCollectionsSerializer

    <dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.45</version>
    </dependency>
0
On

I faced the same issue with Avro GenericRecord over Kinesis data stream. Using scala 2.12 and flink 1.11.4.

My solution was to add an implicit TypeInformation

implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)

Below a full code example focusing on the serialisation problem:

@Test def `test avro generic record serializer`(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val schema: String =
    """
      |{
      | "namespace": "com.mberchon.monitor.dto.avro",
      | "type": "record",
      | "name": "TestAvro",
      | "fields": [
      |  {"name": "strVal", "type": ["null", "string"]},
      |  {"name": "longVal",  "type": ["null", "long"]}
      |  ]
      |}
""".stripMargin

  val avroSchema = new Schema.Parser().parse(schema)
  val rec:GenericRecord = new GenericRecordBuilder(avroSchema)
    .set("strVal","foo")
    .set("longVal",1234L)
    .build()

  implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
  val _ = env.fromElements(rec,rec).addSink(new PrintSinkFunction[GenericRecord]())

  env.execute("Test serializer")
}

Coming back to your contexte, following code should work:

...
case class URLResponse (status: Int, domain: String, url: String, queue: String, html: String)
...
val schema: Schema = AvroSchema[URLResponse]
...
implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(schema)
val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties))
0
On

If you can't use use the Java environment to add a source (maybe you're using StreamExecutionEnvironment.readFile method) there is another solution shared here: https://stackoverflow.com/a/32453031/899937, essentially:

val unmodifiableCollectionClass = Class.forName("java.util.Collections$UnmodifiableCollection")
env.getConfig.addDefaultKryoSerializer(unmodifiableCollectionClass, classOf[UnmodifiableCollectionsSerializer])

kryo-serializers is not included in Flink anymore, so you have to add it as a dependency.