I have the following code that is used to create a bunch of separate consumers for separate topics. Once these consumers are created, i iterate through every single one of them and try to read message. I follow two principles here:
- Either get 5 max messages and then stop.
- If you get less than 5 messages but have been waiting for 10 seconds, exit with whatever you get.
Here is is caller class:
async processMessages() {
let allMessages = [];
this.logger.info(`Begin processing messages for all topics.`);
for (let kafkaConsumer of this.consumerPool) {
if (this.maxMessages && allMessages.length >= this.maxMessages) {
this.logger.info(`Reached the maximum message count of ${this.maxMessages}, exiting consumption loop.`);
break;
}
let messages = await kafkaConsumer.consume();
this.logger.info(`Adding messages to allMessages array. Current total messages count: ${allMessages.length}`);
allMessages.push(...messages);
this.logger.info(`Finished consuming messages from topic: ${kafkaConsumer.topic}. Number of messages received: ${messages.length}`);
}
this.logger.info(`Completed processing all topics. Total messages processed: ${allMessages.length}`);
return allMessages;
}
And the actual consumer logic is here:
consume() {
if (this.consumer.paused().find(topicPartition => topicPartition.topic === this.topic)) {
this.consumer.resume([{topic: this.topic}])
}
return new Promise( async(resolve, reject) => {
let messagesBuffer = [];
let paused = this.consumer.paused().find(topicPartition => topicPartition.topic === this.topic);
setTimeout(async () => {
if (!paused) {
this.logger.info(`Setting timeout for consumer on topic: ${this.topic}. Timeout: ${this.consumerTimeout}ms`);
try{
this.logger.info(`Timeout occurred for consumer on topic: ${this.topic}. Current buffer size: ${messagesBuffer.length}`);
this.consumer.pause([{topic: this.topic}])
resolve(messagesBuffer);
}catch(err){
this.logger.info(`Consumer paused failed: ${err}`);
reject(err)
}
}
}, this.consumerTimeout);
await this.consumer.run({
eachMessage: async ({message, partition }) => {
this.logger.info(`Received message on topic: ${this.topic}, partition: ${partition}, offset: ${message.offset}`);
if (messagesBuffer.length < this.maxMessages) {
this.logger.info(`Adding message to buffer. Current buffer size: ${messagesBuffer.length}`);
messagesBuffer.push(message.value.toString());
}
this.logger.info(`Message: ${message.value.toString()}`)
this.logger.info(`Committing new Message: ${message.offset}`)
await this.consumer.commitOffsets([
{topic: this.topic, partition, offset: message.offset},
]);
if (messagesBuffer.length === this.maxMessages) {
this.logger.info(`Message buffer limit reached for topic: ${this.topic}. Buffer size: ${messagesBuffer.length}`);
try{
this.consumer.pause([{topic: this.topic}])
}catch(err){
this.logger.info(`Consumer paused failed: ${err}`);
reject(err)
}
this.logger.info(`Consumer returning since 5 messages read: ${JSON.stringify(messagesBuffer)}`);
resolve(messagesBuffer);
}
}
});
this.logger.info(`Consumer returning since 5 messages read out side await: ${JSON.stringify(messagesBuffer)}`);
});
}
There are a few problems i am seeing:
- the first iteration goes through successfully and I am able to get 5 pending messages in the queue
- Second iteration onwards i dont get any messages at all. I do see the messages about receiving messages and committing offsets for them in the log but the
allMessages
field never gets the data and the array is always printed as blank.
I am very beginner at Nodejs and have recently started working on it. Can someone guide me into what i am doing wrong?