Our team has created a MSK cluster with SASL_SSL as security protocol. When we try to publish data from our python client, getting below error -
Failed to deliver message: <cimpl.Message object at 0x000001E88C23A040>: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}
Code -
from confluent_kafka import Producer
import socket
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
def produce():
print("producer.....")
conf = {'bootstrap.servers': 'b-1-public.mskqapoc01.*****a.us-east-1.amazonaws.com:9196,b-2-public.mskqapoc01.*********.kafka.us-east-1.amazonaws.com:9196',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'user',
'sasl.password': 'password',
'client.id': socket.gethostname()}
producer = Producer(conf)
producer.produce("Kafkapasstopic", key="0", value="first message from mh", callback=acked)
# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)
producer.flush()
produce()
Any idea what could the reason? is it an issue from client or something missing while creating MSK cluster?
Below is security config of msk cluster :

Thanks Mahendra
MHegde,
It doesn't seem like your code is a problem.
Since you enabled public access to the cluster, you had to change a property:
allow.everyone.if.no.acl.found=false.Please make sure you allow the user you are using in your code to access your topic. You will need to use kafka-acls command tool:
Make sure your file
kafka/config/sasl-scram-config.confhas the following definition: