- Spark: 3.0.0
- Scala: 2.12
- confluent
I am having spark structured streaming job and looking for an example for writing data frames to Kafka in Protbuf format.
I read messages from PostgreSQL and after doing all the transformations have a data frame with Key and Value:
root
|-- key: string (nullable = true)
|-- value: binary (nullable = false)
Code to push message to kafka:
val kafkaOptions = Seq(
KAFKA_BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer",
"schema.registry.url" -> "http://localhost:8081",
"topic" -> "test_users"
)
tDF
.write
.format(KAFKA)
.options(kafkaOptions.toMap)
.save()
Message in binary format is posted, but I am not able to deserialized as there no schema in confluent
Is there a lib that can simply things for me? Or a sample code that I can refer.