Spark structured streaming how to write to Kafka in Protobuf format

384 Views Asked by At
  • Spark: 3.0.0
  • Scala: 2.12
  • confluent

I am having spark structured streaming job and looking for an example for writing data frames to Kafka in Protbuf format.

I read messages from PostgreSQL and after doing all the transformations have a data frame with Key and Value:

root
 |-- key: string (nullable = true)
 |-- value: binary (nullable = false)

Code to push message to kafka:

  val kafkaOptions = Seq(
      KAFKA_BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer",
      "schema.registry.url" -> "http://localhost:8081",
      "topic" -> "test_users"
    )

 tDF
      .write
      .format(KAFKA)
      .options(kafkaOptions.toMap)
      .save()

Message in binary format is posted, but I am not able to deserialized as there no schema in confluent

Is there a lib that can simply things for me? Or a sample code that I can refer.

0

There are 0 best solutions below