Spring Kafka multi-member group unit-test

2.2k Views Asked by At

Cribbing from the helpful info here, I am trying to get a unit-test running that demonstrates the distribution of messages across multiple members of the same group listening on the same topic. I just started with Kafka so I may be missing something fundamental.

I don't get a full partition assignment in setup(). If I comment out these three listeners/members of the same group, and change the test to look like the referenced question (distinct groups/broadcast scenario), then everything works. Any pointers would be appreciated. Thanks!

My setup:

Test class:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class SpringKafkaApplicationTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SpringKafkaApplicationTest.class);

    private final static String BAR_TOPIC = "bar.t";
    private final static String FOO_TOPIC = "foo.t";

    @Autowired
    private Sender sender;

    @Autowired
    private Receiver receiver;
    //    @Autowired
    //    private Receiver receiver2_G1;
    //    @Autowired
    //    private Receiver2 receiver1_G2;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    // create 3 partitions per topic - to support up to 3 consumers in a group
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 3, BAR_TOPIC, FOO_TOPIC);

    @Before
    public void setUp() throws Exception {
        // wait until the partitions are assigned
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
                .getListenerContainers()) {
            LOGGER.info("calling waitForAssignment on messageListenerContainer.getClass(): " + messageListenerContainer.getClass());
            LOGGER.info("embeddedKafka.getPartitionsPerTopic(): " + embeddedKafka.getPartitionsPerTopic());

            ContainerTestUtils.waitForAssignment(messageListenerContainer,
                    embeddedKafka.getPartitionsPerTopic());
        }
    }

    @Test
    public void testReceive() throws Exception {
        for (int i = 0; i < 10; i++) {
            sender.send(BAR_TOPIC, "testkey_" + i, new Bar("bar"));
            sender.send(FOO_TOPIC, "testkey_" + i, new Foo("foo"));
        }
...

Receiver config class:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); // not really used since groupId annotation overrides

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

Receiver class with listeners:

public class Receiver {

    // test 3 listeners in same group - each should get 1/3 of transmitted objects
    @KafkaListener(id = "barListener_sharedGroup1", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup")
    public void receiveBarSharedGroup1(@Payload Bar bar,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        LOGGER.info("receiveBarSharedGroup1(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
        multigroup1latch.countDown();
    }

    @KafkaListener(id = "barListener_sharedGroup2", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup")
    public void receiveBarSharedGroup2(@Payload Bar bar,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        LOGGER.info("receiveBarSharedGroup2(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
        multigroup2latch.countDown();
    }

    @KafkaListener(id = "barListener_sharedGroup3", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup")
    public void receiveBarSharedGroup3(@Payload Bar bar,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        LOGGER.info("receiveBarSharedGroup3(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
        multigroup3latch.countDown();
    }

Replacing these listeners with distinct groups, this works as expected:

//works - bar_group1 listens on topic 'bar' and should receive all that are sent
@KafkaListener(id = "barListener_group1", topics = "${kafka.topic.bar}", groupId = "bar_group1")
public void receiveBarGroup1(@Payload Bar bar,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    LOGGER.info("receiveBarGroup1(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
    eventProcessor.insertLocationData(bar.toString());
    group1latch.countDown();
}

// same as above for bar_group2
@KafkaListener(id = "barListener_group2", topics = "${kafka.topic.bar}", groupId = "bar_group2")
public void receiveBarGroup2(@Payload Bar bar,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    LOGGER.info("receiveBarGroup2(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
    eventProcessor.processNotificationMessage(bar.toString());
    group2latch.countDown();
}

// this is using default group - again a different group, so should receive all transmitted 'bar' objs
@KafkaListener(id = "barListener_groupD", topics = "${kafka.topic.bar}")
public void receiveBar(@Payload Bar bar,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    LOGGER.info("receiveBar(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic);
    latch.countDown();
}

// also using default group - listening for 'foo' objects, should receive all transmitted 'foo' objs
@KafkaListener(id = "fooListener_groupD", topics = "${kafka.topic.foo}")
public void receiveFoo(@Payload Foo foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    LOGGER.info("receiveFoo(): received {} with key {}, from partition {}, on topic {}", foo.toString(), key, partition, topic);
    latch.countDown();
}

Debug/application logs:

14:11:18.940 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on WB-ATingley-i5 with PID 3140 (started by atingley in C:\dev\tools\kafka\spring-kafka-master\apt-spring-kafka-multiple-groups)
14:11:18.941 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
14:11:21.089 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 2.477 seconds (JVM running for 4.09)
14:11:21.132 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - calling waitForAssignment on messageListenerContainer.getClass(): class org.springframework.kafka.listener.ConcurrentMessageListenerContainer
14:11:21.132 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - embeddedKafka.getPartitionsPerTopic(): 3
14:11:21.353 [barListener_sharedGroup3-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
14:11:21.353 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:11:21.403 [barListener_sharedGroup2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
14:11:21.404 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:11:21.411 [barListener_sharedGroup1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
14:11:21.411 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:11:21.434 [barListener_sharedGroup3-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-0, bar.t-2, bar.t-1]
14:11:22.076 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:23.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:24.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:24.438 [barListener_sharedGroup3-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[bar.t-0, bar.t-2, bar.t-1]
14:11:24.438 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:11:24.445 [barListener_sharedGroup2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-0]
14:11:24.446 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:24.446 [barListener_sharedGroup3-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-1]
14:11:24.446 [barListener_sharedGroup1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-2]

<<< at this point, partition assignment looks as I would expect it - one per listener - but we are waiting on an assignment of three per listener? >>>

14:11:24.446 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:25.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:11:25.446 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
...
14:12:20.447 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:12:20.449 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:12:21.079 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
14:12:21.199 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:12:21.199 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:12:21.199 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
14:12:21.203 [barListener_sharedGroup1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
14:12:21.204 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup1-0, clientIndex=-0, topicPartitions=[bar.t-2]] stopped normally
14:12:21.208 [barListener_sharedGroup3-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
14:12:21.208 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup3-0, clientIndex=-0, topicPartitions=[bar.t-1]] stopped normally
14:12:21.212 [barListener_sharedGroup2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
14:12:21.212 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup2-0, clientIndex=-0, topicPartitions=[bar.t-0]] stopped normally

Thanks!

Oh - and the unit-test assertion failure (getting warmer...!). Thanks!

SpringKafkaApplicationTest (3)
com.codenotfound.kafka.SpringKafkaApplicationTest
testReceive(com.codenotfound.kafka.SpringKafkaApplicationTest)
org.junit.ComparisonFailure: expected:<[3]> but was:<[1]>
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:74)
    at com.codenotfound.kafka.SpringKafkaApplicationTest.setUp(SpringKafkaApplicationTest.java:62)
1

There are 1 best solutions below

0
On

Here's my problem, a copy/paste error from a different unit-test - though I need 3 partitions, 1 per listener, each listener container only gets assigned one partition (as intended):

ContainerTestUtils.waitForAssignment(messageListenerContainer,
    embeddedKafka.getPartitionsPerTopic());

... should be:

ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);