Cannot scale Kafka Consumer Groups

204 Views Asked by At

Setup: 1 Topic (test-topic) with 4 partitions

  • Vert.x: 4.0.3
  • RxJava2 : 2.2.12
  • Vert.x internally uses kafka-client: 2.6.0
  • Kafka broker: 2.8.1

Scenario 1: When 1 consumer group with 1 consumer is used, the system takes 60 seconds to complete processing 100 messages

Scenario 2: When 1 consumer group with 4 consumers are used, then the system takes around 15 seconds to process same 100 messages

Scenario 3: When 4 consumer groups having same group id with 1 consumer per group is used, then the system takes 60 seconds to process.

My assumption was in Scenario 3, the time taken to process all the messages would be around 15 seconds but that is not the case. I verified by logs that the partitions are distributed and all the partitions recieve 25 messages. Can anyone please help me understand what I might be doing wrong here that adding consumer groups does not scale? Is this behavior normal (which I do not think so)

Note:

  • The consumer and producer setting are default and the message delivery is "exactly-once".
  • The code is tested on local setup and on AWS too.

Psuedo Code:

    Observable<KafkaConsumerRecord<String, String>> observable = consumer.toObservable();
    consumer.subscribe(topics);
    observable
            .flatMapCompletable(record -> producer.rxBeginTransaction()
                    .andThen(producer.rxSend(producerRecord))
                    .flatMapCompletable(recordMetadata -> Completable.fromAction(() -> {
                        Map<TopicPartition, OffsetAndMetadata> consumerOffsetsMap = new HashMap<>();
                        consumerOffsetsMap.put(
                                new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset()));
                        producer.getDelegate().unwrap().sendOffsetsToTransaction(consumerOffsetsMap, "grp1");
                    }))
                    .andThen(producer.rxCommitTransaction())
                    .andThen(consumer.rxCommit())
            ).subscribe(() -> {
                System.out.println("Processed successfully");
            });
1

There are 1 best solutions below

4
On

One group will have (approximately) the same processing time as multiple, independent groups.

With a perfectly balanced topic, and immediate consumer group rebalances, one group with distributed consumers will take a fractional time as you've found -

(time of one consumer reading all partitions / min(number of partitions, number of group members)

In your case, 60 / min(4,4) = 15


Scenario 3: When 4 consumer groups having same group id

That's literally not possible. I assume you meant different group ids. This would be the only reason why Scenario 3 takes the same time as Scenario 1. Because you're repeating Scenario 1 four times. The fact that it takes the same amount of time to run 4 groups means it does scale (horizontally). A reduced time would imply vertical scaling within the group, which you saw in Scenario 2.

app running on Kubernetes. Just change the replicas to increase the number of instance

This alone will not create unique consumer groups. You need to inject a different group.id value into each app to create new consumer groups.