There already is a similar question here, however it doesn't entirely solve my problem.
In my consumer I have a Product class. To read from topic products I use this:
public class KafkaMessagingService implements MessagingService {
@Override
@KafkaListener(id = "inventory_service_consumer", topics = "products")
public void processProductAdded(Product product) {
System.out.println(product);
}
}
I am able to get the desired behavior by using this in the application.yml:
spring:
kafka:
bootstrap-servers: localhost:9094
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
value:
default:
type: com.example.inventoryservice.core.entity.Product
use:
type:
headers: false
I have to specifically say that I want to use com.example.inventoryservice.core.entity.Product class to deserialize into, otherwise I get exceptions:
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.2.jar:6.1.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
... 12 common frames omitted
Now let's imagine that I want to read from multiple topics. Ideally, I would want to do this:
public class KafkaMessagingService implements MessagingService {
@Override
@KafkaListener(id = "inventory_service_consumer", topics = "products")
public void processProductAdded(Product product) {
System.out.println(product);
}
@Override
@KafkaListener(id = "inventory_service_consumer", topics = "users")
public void processUserAdded(User user) {
System.out.println(user);
}
}
User being in the package com.example.inventoryservice.core.entity.User.
How can I make spring kafka deserialize automatically into the type provided in the @KafkaListener method?
Or is there any other way to achieve this without specifying the class to deserialize into?
Found the solution...
It was to add
properties = {"spring.json.value.default.type=com.example.inventoryservice.core.entity.Product"}to the@KafkaListenerannotation.I found the solution here. It is the second answer.