I am running multiple instance of apache beam KafkaIO using DirectRunner, which are reading from same topic. But message is getting delivered to all running instances. After seeing Kafka configuration I found, group name is getting appended with some unique prefix and each instance has unique group name.
- group.id = Reader-0_offset_consumer_559337182_my_group
- group.id = Reader-0_offset_consumer_559337345_my_group
So each instance has unique group.id assigned and thats why messages are getting delivered to all instances.
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
.withConsumerConfigUpdates(
new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
.withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()
So what configuration I have to give so that all consumers in a group doesn't read same message
Yes, this happens because group name is getting appended with some unique prefix and each instance has unique group name. Because of this kafka doesn't know if you spin up one more instance. Hence, same messages get delivered to all consumers.
Hence, one work around I could think of is instead of giving topic and let beam figure out the number of consumers for all the partitions, you can explicitly give the topic partitions for each instance of apache beam KafkaIO using DirectRunner.
You will have to pass a
List
of typeTopicPartition
to the methodwithTopicPartitions
.The above code will read messages only from
partition 0
. Hence, this way you can spin up multiple instances of the same program without same messages getting delivered to all consumers