Regarding the consumption of Kafka under the multi-process model of Node.js cluster

34 Views Asked by At

I encountered a problem. I used the node.js cluster model to register kafka consumers, but the multi-process did not work. Instead, only one process was continuously consuming. What is the reason for this?

In this example, the worker I output is always the same. I don't know why this is happening. Please help me.

const cluster = require('cluster');  
const { Kafka } = require('kafkajs');  
  
// Create Kafka client  
const kafka = new Kafka({  
  clientId: 'my-app',  
  brokers: ['localhost:9092'],  
});  
  
// Define multi-process consumer function  
function multiProcessConsumer(topicName, numWorkers) {  
  if (cluster.isMaster) {  
    // Create multiple worker processes in the master process  
    for (let i = 0; i < numWorkers; i++) {  
      cluster.fork();  
    }  
  } else {  
    // Register consumer in the worker process  
    const consumer = kafka.consumer({ groupId: 'group-id' });  
    consumer.connect();  
    consumer.subscribe({ topic: topicName, fromBeginning: true });  
    console.log(`Worker ${cluster.worker.id} is consuming messages`);  
    consumer.run({ eachMessage: async ({ topic, partition, message }) => {  
      console.log(`Worker ${cluster.worker.id} consumed message: ${message.value}`);  
      // Process message logic  
    } });  
  }  
}  
  
// Start multi-process consumer, listen to the topicName topic, and register consumers using numWorkers worker processes  
multiProcessConsumer('my-topic', 2);
0

There are 0 best solutions below