Spark-avro Cannot grow BufferHolder because the size is negative - where to look for the cause?

409 Views Asked by At

Environment:

  • Scala 2.11
  • Spark 2.4
  • Hortownorks SchemaRegistry
  • Kafka messages with embedded schema information.

Context

As stated above, I am aware of how Hortonworks SchemaRegistry information is embedded in the Kafka message. First 13 bytes of information (1 magic byte + 8 schemaId + 4 schemaVersion) are extracted and cut out of the message after downloading it from Kafka passing only "clean avro" data forward in a column named valueWithoutEmbeddedInfo.

I use com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient to download Schema inside Spark. Then I pass SchemaVersionInfo.getSchemaText as a jsonFormatedSchema

Schema itself is gigantic, it is a single-line json which is over 5MB however messages are properly deserialized when using Kafka UI to see them.

Script is executed on yarn using spark-submit.

EDIT: How did I get clean avro data

I used .withColumn("valueWithoutEmbeddedInfo", expr("substring(value, 14, length(value)-13)") on the incoming Kafka message

Problem:

After downloading messages from Kafka with Avro values, when trying to deserialize them using from_avro(col(valueWithoutEmbeddedInfo), jsonFormatedSchema) an error occurs saying Cannot grow BufferHolder by size -556231 because the size is negative.

Question:

What may be causing this problem and how one should resolve it?

I would appreciate any advice, thank you in advance.

0

There are 0 best solutions below