how can we control polling messages from kafka queue using akka kafka connector

47 Views Asked by At

We have a usecase where we need to control polling of messages from the kafka queue using akka kafka connector based on the actor availablity in order to process the message. We have 2 consumers of 5 actors each to process the messages and we need to poll the messages based on my actor availability from both of my consumers.

We have observed that akka consumer1 is polling messages from kafka continuously and storing them in akka streams and consumer2 sitting idle, but we need to control this behavior and whenever we have an processing actor available then only we need the consumer to poll messages.

As per the akka documentation regarding backpressure, we tried adding Sink.actorRefWithBackpressure() in order to control the polling, but backpressure is happening on the actor stream but not on the consumer poll.

How do we control the consumer polling based on my processing actor availability from the pool of actors.

Below is my code for create and registering a consumer for a specific topic:

public void createAndRegisterConsumer(String groupId, ActorRef actor, SourceType sourceType, String... topics) {
    RestartSource.onFailuresWithBackoff(Duration.ofSeconds(3), Duration.ofSeconds(20), 0.2,
            () -> createRawConsumer(groupId, sourceType, topics).mapMaterializedValue(c -> {
                topicMapping.put(topicKey(topics), c);
                return c;
            })).map(ConsumerRecord::value).runWith(Sink.actorRefWithBackpressure(actor, new StreamInit(), Ack.INSTANCE, new StreamComplete(), (e) -> {
                LOGGER.error("Stream has failed", e);
                return new StreamFailed(e);
            }), materializer);

}

public Source<ConsumerRecord<byte[], String>, Consumer.Control> createRawConsumer(String groupId, SourceType sourceType, String... topics) {
    if (topics == null || topics.length == 0) {
        throw new RuntimeException("Must provide at least one topic to consume");
    }
    final ConsumerSettings<byte[], String> consumerSettings = getConsumerSettings(groupId, this.kafkaConfig, sourceType == SourceType.PLAIN);
    final Subscription subscription = Subscriptions.topics(topics);
    switch (sourceType) {
        case PLAIN:
            return Consumer.plainSource(consumerSettings, subscription);
        case AT_MOST_ONCE:
            return Consumer.atMostOnceSource(consumerSettings, subscription);
        default:
            throw new UnsupportedOperationException("The source type " + sourceType + " is not supported");
    }
}
1

There are 1 best solutions below

0
On

It's not possible to completely do this: the consumer actor will poll periodically even in the absence of demand. Part of this is a legacy of old Kafka, where not polling could result in a consumer rebalance.

It is possible to set the interval between scheduled polls to be arbitrarily long via the ConsumerSettings:

consumerSettings.withPollInterval(Duration.ofHours(1))