I have a consumer application written in GoLang that is running as a service in ECS Fargate, which I'm trying to use to consume from an MSK Serverless Cluster. The ECS service is on the same VPC as the MSK Cluster, and the Security Group associated with MSK has been updated to allow inbound connections from the Security Group associated with the ECS service. I have a separate Node.js application on EC2 that is successfully producing to the cluster as well. However, in the ECS application I'm seeing the following logs from Sarama:
Failed to connect to broker 'boot-..kafka-serverless.us-east-2.amazonaws.com:9098: dial tcp: lookup 'boot-..kafka-serverless.us-east-2.amazonaws.com: no such host
The bootstrap broker address is correct for the cluster, and having looked at the Sarama source code, this happens before the bearer auth logic. I have also confirmed that the role assigned to the ECS service has the proper permissions to connect to Kafka and consume.
Below is the relevant source code for dialing the cluster:
import (
"context"
"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"github.com/<redacted>/log"
)
type MSKConf struct {
Region string `json:"region"`
}
func (m *MSKConf) Token() (*sarama.AccessToken, error) {
signer.AwsDebugCreds = true
token, _, err := signer.GenerateAuthToken(context.Background(), m.Region)
return &sarama.AccessToken{Token: token}, err
}
type ConsumerConfig struct {
Brokers []string `json:"brokers"`
Topic string `json:"topic"`
ClientID string `json:"clientID"`
MSK *MSKConf `json:"msk"`
}
func (cc *ConsumerConfig) toSaramaConf(ctx context.Context) *sarama.Config {
config := sarama.NewConfig()
config.ClientID = cc.ClientID
if cc.MSK != nil {
log.Info("Connecting to MSK...")
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = cc.MSK
}
return config
}
func InitConsumer(ctx context.Context, conf *ConsumerConfig, handler IHandler) error {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
consumer, err := setUpConsumer(ctx, conf)
if err != nil {
return err
} else {
lgr.Infof("Kafka Consumer is up and running!")
}
err = consumeMessages(ctx, handler, consumer, conf)
if err != nil {
return err
}
return nil
}
func setUpConsumer(ctx context.Context, conf *ConsumerConfig) (sarama.Consumer, error) {
config := conf.toSaramaConf(ctx)
return sarama.NewConsumer(conf.Brokers, config)
}
Can you verify if TLS is enabled for consumer config?
You can add TLS config like below:
Hope it helps!