how to check avro schema registry usage

407 Views Asked by At

I am using avro through avro4s. This is my configuration for consumer/producer

 def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
    system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("schema.registry.url", "http://localhost:8081")
    .withProperty("auto.create.topics.enable", "true")

  def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
    ConsumerSettings(
      system,
      new StringDeserializer,
      new ByteArrayDeserializer)
      .withBootstrapServers("localhost:9092")
      .withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("schema.registry.url", "http://localhost:8081")
      .withGroupId("test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

I have doubts that the registers are used. When my application is running it is silence in the schema registry logs.

How can i check that my application is use the registry?

And if it is not - how to fix it?

1

There are 1 best solutions below

0
On BEST ANSWER

You're using the wrong classes, so your properties will likely have errors

You actually need to use KafkaAvroSerializer for the Producer here

new StringSerializer,
new ByteArraySerializer)

And KafkaAvroDeserializer for the consumer here

new StringDeserializer,
new ByteArrayDeserializer)

And try to change String, Array[Byte] to GenericRecord or some case-class you made from Avro4s