TOPIC_AUTHORIZATION_FAILED, val=29, str="Broker: Topic authorization failed error from MSK

36 Views Asked by At

Our team has created a MSK cluster with SASL_SSL as security protocol. When we try to publish data from our python client, getting below error -

Failed to deliver message: <cimpl.Message object at 0x000001E88C23A040>: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}

Code -


from confluent_kafka import Producer
import socket

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

def produce():
    print("producer.....")
    conf = {'bootstrap.servers': 'b-1-public.mskqapoc01.*****a.us-east-1.amazonaws.com:9196,b-2-public.mskqapoc01.*********.kafka.us-east-1.amazonaws.com:9196',
            'security.protocol': 'SASL_SSL',
            'sasl.mechanism': 'SCRAM-SHA-512',
            'sasl.username': 'user',
            'sasl.password': 'password',
            'client.id': socket.gethostname()}
    
    producer = Producer(conf)
    producer.produce("Kafkapasstopic", key="0", value="first message from mh", callback=acked)

    # Wait up to 1 second for events. Callbacks will be invoked during
    # this method call if the message is acknowledged.
    producer.poll(1)
    producer.flush()

produce()

Any idea what could the reason? is it an issue from client or something missing while creating MSK cluster?

Below is security config of msk cluster : enter image description here

Thanks Mahendra

1

There are 1 best solutions below

0
EdbE On

MHegde,

It doesn't seem like your code is a problem.

Since you enabled public access to the cluster, you had to change a property: allow.everyone.if.no.acl.found=false.

Please make sure you allow the user you are using in your code to access your topic. You will need to use kafka-acls command tool:

kafka/bin/kafka-acls.sh --bootstrap-server b-1-public.mskqapoc01.*****a.us-east-1.amazonaws.com:9196 \
  --command-config kafka/config/sasl-scram-config.conf \
  --add \
  --allow-principal User:YOUR_ACTUAL_USERNAME \
  --operation read \
  --operation write \
  --topic Kafkapasstopic

Make sure your file kafka/config/sasl-scram-config.conf has the following definition:

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";