I am trying to implement RecordFilterStrategy interface with my Kafka message listener , but it is not filtering the data as expected. Not going in overridden filter method. Here's the code snippet -
//KafkaConfig
ConcurrentMessageListenerContainer<String,Object> kafkaListnerContainer(){
ContainerProperties prop = new ContainerProperties("topicName");
prop.setMessageListner(myMessageListener);
ConcurrentMessageListenerContainer<String,Object> lContainer = new ConcurrentMessageListenerContainer(consumerFactory(),prop);
return lContainer ;
}
//KafkaListner
public class KafkaMessageListener implements MessageListner<String,Object> ,RecordFilterStrategy<String,Object>{
@Override
public void onMessage(ConsumerRecord<String,Object> record){
//business logic
}
@Override
public boolean filter(ConsumerRecord<String,Object> record){
return someCondition?true:false;
}
}
What leads you to believe that implementing that interface on your listener is all you need to do?
The
RecordFilterStrategy
needs to be passed into aFilteringMessageListenerAdapter
(along with your listener) and then add the adapter to the container.