Listener not consuming message on Test with EmbeddedKafka Spring

286 Views Asked by At

I was trying to see if a service is invoked when the consumer receives a message from kafka topic but the test is not passing and the consumer is not even receiving the message.

My test:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ConsumerTest {

    @Autowired
    Consumer consumer;

    @Mock
    private Service service;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {

        String message = "hi";

        kafkaTemplate.send("topic", message);

        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

       
        verify(service, times(1)).addMessage();
    }
}

The consumer, in main package, is a normal consumer with @KafkaListener(topics = "topic"). Then I have a configuration file:

@EnableKafka
@Configuration
public class KafkaConfig {
  
  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new StringDeserializer());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }


}

And also in application.properties (inside test package) i put this:

spring: kafka: consumer: auto-offset-reset: earliest group-id: group

0

There are 0 best solutions below