Cannot subscribe multiple consumers to Spring Kafka Test embedded Kafka broker

921 Views Asked by At

I tried to subscribe two consumers to one EmbeddedKafkaBroker. The first one succeed, but the second failed. Both @EmbeddedKafka and @ClassRule brokers failed.

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(topics = { "topic" })
public class AnnotationEmbeddedKafkaTest {

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    public void annotationEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClassRuleEmbeddedKafkaTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, false, "topic");

    private EmbeddedKafkaBroker broker = embeddedKafkaRule.getEmbeddedKafka();

    @Test
    public void classRuleEmbeddedKafkaTest() {
        Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
        broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
        System.out.println("consumer1 assignments=" + consumer1.assignment());

        Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
        Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
        broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
        System.out.println("consumer2 assignments=" + consumer2.assignment());
    }
}

I am expecting the two consumers can subscribe to one EmbeddedKafkaBroker. Is it possible in Spring Kafka Test?

I replicated this here: https://github.com/yraydhitya/spring-kafka-test-multiple-consumers

1

There are 1 best solutions below

2
On

If you want both consumers to receive all message from the topic you need them to be part of different consumer groups, for example:

Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded1", "false", broker);

and

Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded2", "false", broker);

Otherwise each consumer will be assigned to a different partition of the topic and since your embedded kafka topic has (by default) only one partition, only one consumer will be assigned to it.