Apache Beam KafkaIO consumers in consumer group getting assigned unique group id

1.4k Views Asked by At

I am running multiple instance of apache beam KafkaIO using DirectRunner, which are reading from same topic. But message is getting delivered to all running instances. After seeing Kafka configuration I found, group name is getting appended with some unique prefix and each instance has unique group name.

  1. group.id = Reader-0_offset_consumer_559337182_my_group
  2. group.id = Reader-0_offset_consumer_559337345_my_group

So each instance has unique group.id assigned and thats why messages are getting delivered to all instances.

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
            .withConsumerConfigUpdates(
                    new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                            .put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
            .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
            .withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()

So what configuration I have to give so that all consumers in a group doesn't read same message

1

There are 1 best solutions below

0
On

Yes, this happens because group name is getting appended with some unique prefix and each instance has unique group name. Because of this kafka doesn't know if you spin up one more instance. Hence, same messages get delivered to all consumers.

Hence, one work around I could think of is instead of giving topic and let beam figure out the number of consumers for all the partitions, you can explicitly give the topic partitions for each instance of apache beam KafkaIO using DirectRunner.

You will have to pass a List of type TopicPartition to the method withTopicPartitions.

KafkaIO.<String, String>read()
                .withCreateTime(Duration.standardMinutes(1))
                .withReadCommitted()
                .withBootstrapServers(endPoint)
                .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                        .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                        .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                        .build())
                .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0)))
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata();

The above code will read messages only from partition 0. Hence, this way you can spin up multiple instances of the same program without same messages getting delivered to all consumers