I'm trying to read all the messages that existed in kafka topic when the consumer connected to it and then terminate consumer.run function and move to execute some other commands, but when I use await consumer.stop inside the condition it causes the program to stuck.
The code:
const kafkaSourceServer = 'some server';
const kafka = new Kafka({
clientId: 'my-app',
brokers: [kafkaSourceServer],
});
async function getMessageCount(topicName) {
try {
const kafka = new Kafka({ brokers: [kafkaSourceServer] });
const admin = kafka.admin();
await admin.connect();
const topicOffsets = await admin.fetchTopicOffsets(topicName);
const messageCount = Object.values(topicOffsets).reduce((sum, offsets) => {
return sum + (offsets.high - offsets.low);
}, 0);
await admin.disconnect();
return messageCount;
} catch (error) {
console.error(`Error occurred while fetching message count for topic '${topicName}': ${error}`);
}
}
const total = await getMessageCount('some topic');
const consumer = kafka.consumer({ groupId: 'my_consumer_group1' , sessionTimeout: 6000 });
await consumer.connect();
await consumer.subscribe({ topic: 'some topic', fromBeginning: true });
let counter = 0;
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
counter += 1;
console.log('counter: ', counter, 'total: ', total);
if (counter >= total) {
await consumer.stop();
}
},
});````