Kafka MSK connection error when using kafkajs

311 Views Asked by At

I have been trying to connect to msk cluster using kafkajs from my ec2 machine. I am getting the following error:

{"level":"ERROR","timestamp":"2023-11-24T11:40:23.989Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":0,"retryTime":351}

if i check the broker url using telnet then it is working fine. It means broker address is working fine. Can anyone help?

I tried everything mentioned in kafkajs documentation for iam on below link.

https://kafka.js.org/docs/configuration#aws-iam-example

My example code is

    import {Kafka} from 'kafkajs';

async function connect(){
    let kafka = new Kafka({
      clientId: 'user_service', // Replace with your desired client ID
      brokers: ['b-2.xxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098','b-1.xxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098'],
      sasl: {
        mechanism: 'aws',
        authorizationIdentity: 'xxxxxxxxx', // UserId or RoleId
        accessKeyId: 'xxxxxxxxxxxxxxxx',
        secretAccessKey: 'xxxxxxxxxxxxxx',
        // sessionToken: '' // Optional
      },
    });
        console.log(kafka)

        let producer = kafka.producer();
        let consumer = kafka.consumer({groupId: "test-group"})

        await producer.connect()
        await consumer.connect()

        await this.producer.send({
                topic:  "test-topic",
      messages: [{ value: JSON.stringify({test: "this is test message to test-topic" })}],
    });
}

connect();

After running this code, I am getting following error

{"level":"ERROR","timestamp":"2023-11-27T04:23:45.533Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":5,"retryTime":12506}
node:internal/process/promises:288
            triggerUncaughtException(err, true /* fromPromise */);
            ^

KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionClosedError: Closed connection
    at Socket.onEnd (/home/ubuntu/new/testkafka/node_modules/kafkajs/src/network/connection.js:197:13)
    ... 2 lines matching cause stack trace ...
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  retryCount: 5,
  retryTime: 12506,
  [cause]: KafkaJSConnectionClosedError: Closed connection
      at Socket.onEnd (/home/ubuntu/new/testkafka/node_modules/kafkajs/src/network/connection.js:197:13)
      at Socket.emit (node:events:529:35)
      at endReadableNT (node:internal/streams/readable:1368:12)
      at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
    retriable: true,
    helpUrl: undefined,
    broker: 'b-2.xxxxxxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098',
    code: undefined,
    host: 'b-2.xxxxxxxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com',
    port: 9098,
    [cause]: undefined
  }
}
1

There are 1 best solutions below

0
On

Apparently, as per comment on this blog. kafkajs library's IAM SASL method is not compatible that aws is actually using so u have to use another package alongwith kafkajs to make it work.

The package to be used : @jm18457/kafkajs-msk-iam-authentication-mechanism