Environment:
- Scala 2.11
- Spark 2.4
- Hortownorks SchemaRegistry
- Kafka messages with embedded schema information.
Context
As stated above, I am aware of how Hortonworks SchemaRegistry information is embedded in the Kafka message. First 13 bytes of information (1 magic byte + 8 schemaId + 4 schemaVersion) are extracted and cut out of the message after downloading it from Kafka passing only "clean avro" data forward in a column named valueWithoutEmbeddedInfo.
I use com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient
to download Schema inside Spark. Then I pass SchemaVersionInfo.getSchemaText
as a jsonFormatedSchema
Schema itself is gigantic, it is a single-line json which is over 5MB however messages are properly deserialized when using Kafka UI to see them.
Script is executed on yarn using spark-submit
.
EDIT: How did I get clean avro data
I used .withColumn("valueWithoutEmbeddedInfo", expr("substring(value, 14, length(value)-13)")
on the incoming Kafka message
Problem:
After downloading messages from Kafka with Avro values, when trying to deserialize them using from_avro(col(valueWithoutEmbeddedInfo), jsonFormatedSchema)
an error occurs saying Cannot grow BufferHolder by size -556231 because the size is negative
.
Question:
What may be causing this problem and how one should resolve it?
I would appreciate any advice, thank you in advance.