RecordFilterStrategy not working with Kafka message listener

2.1k Views Asked by At

I am trying to implement RecordFilterStrategy interface with my Kafka message listener , but it is not filtering the data as expected. Not going in overridden filter method. Here's the code snippet -

//KafkaConfig
ConcurrentMessageListenerContainer<String,Object> kafkaListnerContainer(){
    ContainerProperties prop = new ContainerProperties("topicName");
    prop.setMessageListner(myMessageListener);
    ConcurrentMessageListenerContainer<String,Object> lContainer = new ConcurrentMessageListenerContainer(consumerFactory(),prop);
    return lContainer ;    
}

//KafkaListner
public class KafkaMessageListener implements MessageListner<String,Object> ,RecordFilterStrategy<String,Object>{

   @Override
   public void onMessage(ConsumerRecord<String,Object> record){
      //business logic
   }

   @Override
   public boolean filter(ConsumerRecord<String,Object> record){
       return someCondition?true:false;              
   }
}
2

There are 2 best solutions below

0
On

What leads you to believe that implementing that interface on your listener is all you need to do?

The RecordFilterStrategy needs to be passed into a FilteringMessageListenerAdapter (along with your listener) and then add the adapter to the container.

**
 * A {@link MessageListener} adapter that implements filter logic
 * via a {@link RecordFilterStrategy}.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 *
 */
public class FilteringMessageListenerAdapter<K, V>
        extends AbstractFilteringMessageListener<K, V, MessageListener<K, V>>
        implements AcknowledgingConsumerAwareMessageListener<K, V> {

    private final boolean ackDiscarded;

    /**
     * Create an instance with the supplied strategy and delegate listener.
     * @param delegate the delegate.
     * @param recordFilterStrategy the filter.
     */
    public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
            RecordFilterStrategy<K, V> recordFilterStrategy) {
        super(delegate, recordFilterStrategy);
        this.ackDiscarded = false;
    }

    /**
     * Create an instance with the supplied strategy and delegate listener.
     * @param delegate the delegate.
     * @param recordFilterStrategy the filter.
     * @param ackDiscarded true to ack (commit offset for) discarded messages when the
     * listener is configured for manual acks.
     */
    public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
            RecordFilterStrategy<K, V> recordFilterStrategy, boolean ackDiscarded) {
        super(delegate, recordFilterStrategy);
        this.ackDiscarded = ackDiscarded;
    }

...
0
On

I believe changing the listener container initialization method to something like below would make the filter work:

ConcurrentMessageListenerContainer<String, Object> kafkaListenerContainer() {
    ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setRecordFilterStrategy(myMessageListener);
    containerFactory.getContainerProperties().setMessageListener(myMessageListener);
        
    ConcurrentMessageListenerContainer<String, Object> lContainer = containerFactory.createContainer("topicName");
    return lContainer;    
}