I encountered a problem. I used the node.js cluster model to register kafka consumers, but the multi-process did not work. Instead, only one process was continuously consuming. What is the reason for this?
In this example, the worker I output is always the same. I don't know why this is happening. Please help me.
const cluster = require('cluster');
const { Kafka } = require('kafkajs');
// Create Kafka client
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
// Define multi-process consumer function
function multiProcessConsumer(topicName, numWorkers) {
if (cluster.isMaster) {
// Create multiple worker processes in the master process
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
} else {
// Register consumer in the worker process
const consumer = kafka.consumer({ groupId: 'group-id' });
consumer.connect();
consumer.subscribe({ topic: topicName, fromBeginning: true });
console.log(`Worker ${cluster.worker.id} is consuming messages`);
consumer.run({ eachMessage: async ({ topic, partition, message }) => {
console.log(`Worker ${cluster.worker.id} consumed message: ${message.value}`);
// Process message logic
} });
}
}
// Start multi-process consumer, listen to the topicName topic, and register consumers using numWorkers worker processes
multiProcessConsumer('my-topic', 2);