I have created an MSK Cluster in AWS and trying to connect to it from my local node app. I am using kafkajs for making kafka client but not able to connect to cluster.
I have been trying to establish connection like this as described in kafkajs documentation.
this.kafka = new Kafka({
clientId: 'user_service', // Replace with your desired client ID
brokers: ['b-1-public.xxxxxxxxxx.xxxxx.c3.kafka.xxxxxxx1.amazonaws.com:9196'],
sasl: {
mechanism: 'plain',
username: 'bloom-msk',
password: "bloom-msk-dev-secret"
},
});
While running this I am getting following error:
{"level":"ERROR","timestamp":"2023-11-23T11:43:55.513Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":5,"retryTime":6586}
node:internal/process/promises:288
triggerUncaughtException(err, true /* fromPromise */);
^
KafkaJSNonRetriableError
Caused by: KafkaJSConnectionClosedError: Closed connection
For IAM I followed the following steps, and not able to connect. Error is still saying "Closed Connection"
- Made the IAM user named kafka-user
- Created role with MSKFullAccess policy and gave that role to user
- for that user created access keys
- made an inbound rule in the security group for allowing All traffic
Using following code:
this.kafka = new Kafka({
clientId: 'user_service', // Replace with your desired client ID
brokers: ['KAFKA_BROKER_ADDRESS'],
sasl: {
mechanism: 'aws',
authorizationIdentity: 'kafka-user', // UserId or RoleId
accessKeyId: 'ACCESS_KEY',
secretAccessKey: 'ACCESS_SECRET',
// sessionToken: '' // Optional
},
});
I commented the sessionToken as it is optional and I do not know how to get it.