How to intercept kafka messages using AOP(@Around annotation) with @KafkaListener annotation used for listening

99 Views Asked by At

I am upgrading my spring-boot applications from spring-boot 2.7.x to 3.2.x. With spring-boot 2.7.x I was using @StreamListener to listen to messages from kafka, but before giving the control of the msg to the method annotated with @StreamListener I used to intercept the messages using


@Around("@annotation(org.springframework.cloud.stream.annotation.StreamListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @StreamListener 
}

but now with spring-boot 3.2.x StreamListener(along with other annotation like EnableBinding, Output, Input) got deprecated, so I moved to @KafkaListener and updated the above code like this:

@Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
public void msgProcessor(ProceedingJoinPoint jp){
//process msg
jp.proceed()
//process msg again after the processing at method annotated with @KafkaListener
}

but with this change the flow itself is not getting intercepted by @Around annotation. Any suggestion, fixes or alternative would be really helpful.

I tried implementing ConsumerInterceptor<String, Message<?>> in the class and with that when I configure spring.kafka.consumer.properties.interceptor.class=, I can intercept the messages, but I don't have the jp to do jp.proceed() with this to attain the back and forth processing of msgs.

1

There are 1 best solutions below

0
Ricardo Gellman On

As @StreamListener is deprecated and no direct equivalent in Spring Kafka, I would go for ConsumerInterceptor as it allows message interception at the consumer level. To maintain a similar flow, consider using Aspect-Oriented Programming with Spring Kafka. Intercept method execution instead of directly targeting @KafkaListener annotation like should be the way

@Aspect
@Component
public class KafkaListenerInterceptor {
    
    @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    public Object interceptKafkaListener(ProceedingJoinPoint joinPoint) throws Throwable {

        //Process message before invoking Kafka listener method
        
        //Proceed with the invocation of the Kafka listener method
        Object result = joinPoint.proceed();
        
        //Process message after Kafka listener method invocation
        //You can access the result or modify it here if needed
        
        return result;
    }
}