Spring Boot - Kafka - JSON object deserializtion

37 Views Asked by At

There already is a similar question here, however it doesn't entirely solve my problem.

In my consumer I have a Product class. To read from topic products I use this:

public class KafkaMessagingService implements MessagingService {

    @Override
    @KafkaListener(id = "inventory_service_consumer", topics = "products")
    public void processProductAdded(Product product) {
        System.out.println(product);
    }
}

I am able to get the desired behavior by using this in the application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            value:
              default:
                type: com.example.inventoryservice.core.entity.Product
            use:
              type:
                headers: false

I have to specifically say that I want to use com.example.inventoryservice.core.entity.Product class to deserialize into, otherwise I get exceptions:

Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.2.jar:6.1.2]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
    ... 12 common frames omitted

Now let's imagine that I want to read from multiple topics. Ideally, I would want to do this:

public class KafkaMessagingService implements MessagingService {

    @Override
    @KafkaListener(id = "inventory_service_consumer", topics = "products")
    public void processProductAdded(Product product) {
        System.out.println(product);
    }

    @Override
    @KafkaListener(id = "inventory_service_consumer", topics = "users")
    public void processUserAdded(User user) {
        System.out.println(user);
    }
}

User being in the package com.example.inventoryservice.core.entity.User.

How can I make spring kafka deserialize automatically into the type provided in the @KafkaListener method? Or is there any other way to achieve this without specifying the class to deserialize into?

1

There are 1 best solutions below

0
Djole Pi On

Found the solution...

It was to add properties = {"spring.json.value.default.type=com.example.inventoryservice.core.entity.Product"} to the @KafkaListener annotation.

public class KafkaMessagingService implements MessagingService {

    @Override
    @KafkaListener(id = "inventory_service_consumer", topics = "products", properties = {"spring.json.value.default.type=com.example.inventoryservice.core.entity.Product"})
    public void processProductAdded(Product product) {
        System.out.println(product);
    }

}

I found the solution here. It is the second answer.