Need help understanding promise architecture in nodejs

55 Views Asked by At

I have the following code that is used to create a bunch of separate consumers for separate topics. Once these consumers are created, i iterate through every single one of them and try to read message. I follow two principles here:

  1. Either get 5 max messages and then stop.
  2. If you get less than 5 messages but have been waiting for 10 seconds, exit with whatever you get.

Here is is caller class:

async processMessages() {
        let allMessages = [];
        this.logger.info(`Begin processing messages for all topics.`);
        for (let kafkaConsumer of this.consumerPool) {
            if (this.maxMessages && allMessages.length >= this.maxMessages) {
                this.logger.info(`Reached the maximum message count of ${this.maxMessages}, exiting consumption loop.`);
                break;
            }
            let messages = await kafkaConsumer.consume();
            this.logger.info(`Adding messages to allMessages array. Current total messages count: ${allMessages.length}`);
            allMessages.push(...messages);
            this.logger.info(`Finished consuming messages from topic: ${kafkaConsumer.topic}. Number of messages received: ${messages.length}`);
        }
        this.logger.info(`Completed processing all topics. Total messages processed: ${allMessages.length}`);
        return allMessages;
    }

And the actual consumer logic is here:

consume() {
        if (this.consumer.paused().find(topicPartition => topicPartition.topic === this.topic)) {
            this.consumer.resume([{topic: this.topic}])
        }
        return new Promise( async(resolve, reject) => {
            let messagesBuffer = [];
            let paused = this.consumer.paused().find(topicPartition => topicPartition.topic === this.topic);
            setTimeout(async () => {
                if (!paused) {
                    this.logger.info(`Setting timeout for consumer on topic: ${this.topic}. Timeout: ${this.consumerTimeout}ms`);
                    try{
                        this.logger.info(`Timeout occurred for consumer on topic: ${this.topic}. Current buffer size: ${messagesBuffer.length}`);
                        this.consumer.pause([{topic: this.topic}])
                        resolve(messagesBuffer);
                    }catch(err){
                        this.logger.info(`Consumer paused failed: ${err}`);
                        reject(err)
                    }
                }
            }, this.consumerTimeout);
            await this.consumer.run({
                eachMessage: async ({message, partition }) => {
                    this.logger.info(`Received message on topic: ${this.topic}, partition: ${partition}, offset: ${message.offset}`);
                    if (messagesBuffer.length < this.maxMessages) {
                        this.logger.info(`Adding message to buffer. Current buffer size: ${messagesBuffer.length}`);
                        messagesBuffer.push(message.value.toString());
                    }
                    this.logger.info(`Message: ${message.value.toString()}`)
                    this.logger.info(`Committing new Message: ${message.offset}`)
                    await this.consumer.commitOffsets([
                        {topic: this.topic, partition, offset: message.offset},
                    ]);
                    if (messagesBuffer.length === this.maxMessages) {
                        this.logger.info(`Message buffer limit reached for topic: ${this.topic}. Buffer size: ${messagesBuffer.length}`);
                        try{
                            this.consumer.pause([{topic: this.topic}])
                        }catch(err){
                            this.logger.info(`Consumer paused failed: ${err}`);
                            reject(err)
                        }
                        this.logger.info(`Consumer returning since 5 messages read: ${JSON.stringify(messagesBuffer)}`);
                        resolve(messagesBuffer);
                    }
                }
            });
            this.logger.info(`Consumer returning since 5 messages read out side await: ${JSON.stringify(messagesBuffer)}`);
        });
    }

There are a few problems i am seeing:

  1. the first iteration goes through successfully and I am able to get 5 pending messages in the queue
  2. Second iteration onwards i dont get any messages at all. I do see the messages about receiving messages and committing offsets for them in the log but the allMessages field never gets the data and the array is always printed as blank.

I am very beginner at Nodejs and have recently started working on it. Can someone guide me into what i am doing wrong?

0

There are 0 best solutions below