Integration test using EmbeddedKafka: ContainerTestUtil.waitForAssignment throws Expected 1 but got 0 partitions

1.9k Views Asked by At

I have written an integration test for my kafka consumer using spring boot, with the spring-kafka libraries. This test uses EmbeddedKafka. A topic with one partition is used. I used the KafkaMessageListener container for this. But I am getting an error in this line

ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())

The error that I am getting is:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

The code which I referred to is: https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/

@EmbeddedKafka (partitions 1, ports = 9092) 
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")

@RunWith(SpringRunner.class)

@DirtiesContext

@Profile("test") 
@TestPropertySource({"classpath:application.yaml"}}

public class Onboarding ConsumerListenerTest {

BlockingQueue<ConsumerRecord<String, String> records;

KafkaMessageListenerContainer<String, String> container;

@Autowired

protected EmbeddedkafkaBroker embeddedKafkaBroker;

public ConsumerFactory<String, String> consumerFactory;

@Value("${spring.kafka.client_topic}")

private String topicName;

@Value("${spring.kafka.group_id})
private String groupId;

@Before
public void setUp(){
    
    consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
    ContainerProperties containerProperties = new ContainerProperties(topicName);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    consumerRecords = new LinkedBlockingQueue<>();
    container.setupMessageListener(new MessageListener<String, String>(){
        @Override
        public void onMessage(ConsumerRecord<String, String> record){
            records.add(record);
        }
    });
    
    container.start();
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}

And the getKafkaConsumer() function is:

public ConsumerFactory<String, String> 
                             getKafkaConsumer(EmbeddedKafkaBroker 
                               embeddedKafkaBroker,
                               String group,
                               String topic){
        Map<string, Object> consumerProps = 
        KafkaTestUtils.consumerProps(group, "false", 
                                     embeddedkafkaNroker);
       consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG, 
                           embeddedKafkaBroker.getBrokerAsString());
       consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
       consumerProps.put("schema.registry.url", "bogus");
       consumerProps.put("specific.avro.reader", true);
       ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
       return consumerFactory;
}
2

There are 2 best solutions below

1
Gary Russell On BEST ANSWER

It is hard to say what is happening without more information.

An alternative is to manually assign the partitions instead of waiting for group management to assign them:

ContainerProperties containerProperties = 
    new ContainerProperties(new TopicPartitionOffset(topicName, 0), 
                            new TopicPartitionOffset(topicName, 1));
0
Java_Alert On

It can happen when consumer/listener is configured with autoStartup=false i.e. listener is not going to be initiated.

Not working code -

 kafkaListenerEndpointRegistry.getAllListenerContainers().forEach(
  c -> ContainerTestUtils.waitForAssignment(c, 1)); 

Working code would be -

    kafkaListenerEndpointRegistry.getAllListenerContainers().stream().filter(c -> c.isAutoStartup()).forEach(
  c -> ContainerTestUtils.waitForAssignment(c, 1)); 

Consumer with autostartup false look like this -

  @KafkaListener(groupId = "${testgroupid", topics = "${testtopic}", containerFactory = "testContainerFactory", autoStartup = "${configValue:false}")

Note :- by default value of autostartup is true