I have been trying to connect to msk cluster using kafkajs from my ec2 machine. I am getting the following error:
{"level":"ERROR","timestamp":"2023-11-24T11:40:23.989Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":0,"retryTime":351}
if i check the broker url using telnet then it is working fine. It means broker address is working fine. Can anyone help?
I tried everything mentioned in kafkajs documentation for iam on below link.
https://kafka.js.org/docs/configuration#aws-iam-example
My example code is
import {Kafka} from 'kafkajs';
async function connect(){
let kafka = new Kafka({
clientId: 'user_service', // Replace with your desired client ID
brokers: ['b-2.xxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098','b-1.xxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098'],
sasl: {
mechanism: 'aws',
authorizationIdentity: 'xxxxxxxxx', // UserId or RoleId
accessKeyId: 'xxxxxxxxxxxxxxxx',
secretAccessKey: 'xxxxxxxxxxxxxx',
// sessionToken: '' // Optional
},
});
console.log(kafka)
let producer = kafka.producer();
let consumer = kafka.consumer({groupId: "test-group"})
await producer.connect()
await consumer.connect()
await this.producer.send({
topic: "test-topic",
messages: [{ value: JSON.stringify({test: "this is test message to test-topic" })}],
});
}
connect();
After running this code, I am getting following error
{"level":"ERROR","timestamp":"2023-11-27T04:23:45.533Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":5,"retryTime":12506}
node:internal/process/promises:288
triggerUncaughtException(err, true /* fromPromise */);
^
KafkaJSNonRetriableError
Caused by: KafkaJSConnectionClosedError: Closed connection
at Socket.onEnd (/home/ubuntu/new/testkafka/node_modules/kafkajs/src/network/connection.js:197:13)
... 2 lines matching cause stack trace ...
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
retryCount: 5,
retryTime: 12506,
[cause]: KafkaJSConnectionClosedError: Closed connection
at Socket.onEnd (/home/ubuntu/new/testkafka/node_modules/kafkajs/src/network/connection.js:197:13)
at Socket.emit (node:events:529:35)
at endReadableNT (node:internal/streams/readable:1368:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
retriable: true,
helpUrl: undefined,
broker: 'b-2.xxxxxxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com:9098',
code: undefined,
host: 'b-2.xxxxxxxxxxxxxxx.8drmhp.c3.kafka.ap-south-1.amazonaws.com',
port: 9098,
[cause]: undefined
}
}
Apparently, as per comment on this blog. kafkajs library's IAM SASL method is not compatible that aws is actually using so u have to use another package alongwith kafkajs to make it work.
The package to be used : @jm18457/kafkajs-msk-iam-authentication-mechanism