How to handle backpressure when using apache camel and Kafka?

1.2k Views Asked by At

I am trying to write an application that will integrate with Kafka using Camel. (Version - 3.4.2)

I have an approach borrowed from the answer to this question.

I have a route that listens for messages from a Kafka topic. The processing of this message is decoupled from consumption by using a simple executor. Each processing is submitted as a task to this executor. The ordering of the messages is not important and the only concerning factor is how quickly and efficiently the message can be processed. I have disabled the auto-commit and manually commit the messages once the tasks are submitted to the executor. The loss of the messages that are currently being processed (due to crash/shutdown) is okay but the ones in Kafka that have never been submitted for the processing should not be lost (due to committing of the offset). Now to the questions,

  1. How can I efficiently handle the load? For e.g, there are 1000 messages but I can only parallelly process 100 at a time.

Right now the solution I have is to block the consumer polling thread and trying to continuously submit the job. But a suspension of polling would be a much better approach but I cannot find any way to achieve that in Camel.

  1. Is there a better way (Camel way) to decouple processing from consumption and handle backpressure?

public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId", "1");
        ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....", consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context, "kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
                    processTask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processTask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}", e.getMessage());
                    Thread.sleep(1000);
                    processTask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
                            KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }
1

There are 1 best solutions below

0
On

You can use throttle EIP for this purpose.

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

Please take a look at the original documentation here.