The objective is to consume records from kafka and redirect to the Azure Service Bus. The kafka consumer can be configured, and confirmed while debugging, to consume thousands of messages each time it polls the kafka topic.
The Azure Service Bus, on the other hand, will always publish a single message. So when this integration runs, the kafka logs will show that thousands of messages were received, then the Azure Service Bus logs will iterate over each one sending one at a time to the queue. This takes several minutes per iteration slowing the process down significantly.
The component documentation writes that a setting to send in batches is the default, but is less than clear on precisely how to achieve that.
public class SampleKafkaConsumer extends RouteBuilder {
@Override
public void configure() throws Exception {
log.info("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+ "&lingerMs={{consumer.lingerMs}}"
+ "&producerBatchSize={{consumer.producerBatchSize}}"
+ "&saslJaasConfig={{consumer.saslJaasConfig}}"
+ "&saslMechanism={{consumer.saslMechanism}}"
+ "&securityProtocol={{consumer.securityProtocol}}")
.routeId("Kafka")
.to("azure-servicebus:topic?connectionString={{producer.connectionString}}&producerOperation=sendMessages");
}
}
Any insight on how to approach this?
The right solution that the Azure Service Bus docs were attempting to allude to is to batch up the results as a list of objects or messages and pass that into the Azure producer.
The key to do this in Apache Camel is to use an Aggregator. This, couple with a completion interval in the code below allows the pipe to bunch up messages together and send to the Service Bus in batches.