How to Deseralize Avro response getting from Datastream Scala + apache Flink

1k Views Asked by At

I am Getting Avro Response from a Kafka Topic from Confluent and i am facing issues when i want to deseralize the response. Not Understanding the Syntax How i should define the Avro deserializer and use in my Kafka Source while reading.

Sharing the approach i am currently doing.

I have a topic In Confluent named employee which is producing message every 10 seconds and each message is seralized by avro schema registry in the Confluent.

I am trying to Read those messages in my scala program I was able to print the serialised messages in the code but not able to deserialize the messaged.

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord

import java.time.Duration


case class emp(
                   name: String,
                   age: Int,
              )

object Main {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val schemaRegistryUrl = "http://localhost:8081"
    val source = KafkaSource.builder[String].
      setBootstrapServers("localhost:9092")
      .setTopics("employee")
      .setGroupId("my-group")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build


    val streamEnv : DataStream[String] =
      env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "Kafka Source")
    
    

    streamEnv.print()
    env.execute("Example")


  }

}

I tried the Approach of Defining the Avro deserializer in kafka source while reading

.setValueOnlyDeserializer(new AvroDeserializationSchema[emp](classOf[emp])

Had no luck in the above approach as well.

1

There are 1 best solutions below

2
On

Rather than a AvroDeserializationSchema, you need to use a ConfluentRegistryAvroDeserializationSchema instead. The standard Avro deserializer doesn't understand what to do with the magic byte that the Confluent serializer includes.