I am upgrading my spring-boot applications from spring-boot 2.7.x to 3.2.x. With spring-boot 2.7.x I was using @StreamListener to listen to messages from kafka, but before giving the control of the msg to the method annotated with @StreamListener I used to intercept the messages using
@Around("@annotation(org.springframework.cloud.stream.annotation.StreamListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @StreamListener
}
but now with spring-boot 3.2.x StreamListener(along with other annotation like EnableBinding, Output, Input) got deprecated, so I moved to @KafkaListener and updated the above code like this:
@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @KafkaListener
}
but with this change the flow itself is not getting intercepted by @Around annotation. Any suggestion, fixes or alternative would be really helpful.
I tried implementing ConsumerInterceptor<String, Message<?>> in the class and with that when I configure spring.kafka.consumer.properties.interceptor.class=, I can intercept the messages, but I don't have the jp to do jp.proceed() with this to attain the back and forth processing of msgs.
As
@StreamListeneris deprecated and no direct equivalent in Spring Kafka, I would go forConsumerInterceptoras it allows message interception at the consumer level. To maintain a similar flow, consider using Aspect-Oriented Programming with Spring Kafka. Intercept method execution instead of directly targeting@KafkaListenerannotation like should be the way