I am using avro through avro4s. This is my configuration for consumer/producer
def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withProperty("auto.create.topics.enable", "true")
def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
ConsumerSettings(
system,
new StringDeserializer,
new ByteArrayDeserializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
I have doubts that the registers are used. When my application is running it is silence in the schema registry logs.
How can i check that my application is use the registry?
And if it is not - how to fix it?
You're using the wrong classes, so your properties will likely have errors
You actually need to use
KafkaAvroSerializer
for the Producer hereAnd
KafkaAvroDeserializer
for the consumer hereAnd try to change
String, Array[Byte]
toGenericRecord
or some case-class you made from Avro4s