Connect Python to MSK with IAM role-based authentication

5.3k Views Asked by At

I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept the connection:

The script

from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys


async def consume():
    bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
    topic = os.environ.get('TOPIC', 'demo')
    group = os.environ.get('GROUP_ID', 'demo-group')
    consumer = AIOKafkaConsumer(
        topic, bootstrap_servers=bootstrap_server, group_id=group
    )

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()


def main():
    try:
        asyncio.run(consume())
    except KeyboardInterrupt:
        print("Bye!")
        sys.exit(0)


if __name__ == "__main__":
    print("Welcome to Kafka test script. ctrl + c to exit")
    main()

The exception

Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
  File "producer.py", line 33, in <module>
    main()
  File "producer.py", line 25, in main
    asyncio.run(produce_message(message))
  File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "producer.py", line 12, in produce_message
    await producer.start()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
    await self.client.bootstrap()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
    f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>

I've already tested the connection with the kafka shell scripts and it worked fine:

./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties  --topic myTopic

But whenever I try with python it just don't work, I've investigated a little and found that it might be the authentication protocol, my KMS Cluster is protected with IAM role-based authentication but no matter how much I search there is no documentation on how to authenticate with IAM in the python kafka libraries: aiokafka, python-kafka, faust, etc.

Does anyone have an example on how to successfully connect to a KMS serverless cluster with IAM role-based authentication using Python?

2

There are 2 best solutions below

0
On

While its not possible to do this via an existing python packages, here is how I have managed to make this work, by wrapping the kafka client scripts inside python SubProcess.

# producer
def create_cli_producer(arguments):
    print(f"Initializing kafka producer for servers: {arguments.kafka_servers}")
    print(f"topic: {arguments.pub_topic}")

    kafka_producer_init_cmd = [
        f"{arguments.kafka_path}/bin/kafka-console-producer.sh",
        "--topic", arguments.pub_topic,
        "--bootstrap-server", arguments.kafka_servers
    ]

    if arguments.configs:
        kafka_producer_init_cmd = kafka_producer_init_cmd + ["--producer.config", arguments.configs]

    try:
        proc = subp.Popen(kafka_producer_init_cmd, stdin=subp.PIPE)
        print("kafka producer init done.")
        return proc
    except Exception as e:
        print(f"Error creating producer: {e}")
        return None

# consumer.py
def consume_messages(consumer, producer):
    print('Listening for new messages...')
    try:
        for line in consumer.stdout:
            rcvd_msg = line.decode().strip()
            print(f"Received: {rcvd_msg}")

            send_msg_thread = threading.Thread(target=send_message, args=(producer, rcvd_msg))
            send_msg_thread.daemon = True
            send_msg_thread.start()
    except KeyboardInterrupt:
        # If the user interrupts the program (e.g., by pressing Ctrl+C),
        # terminate the subprocess gracefully
        consumer.terminate()
        consumer.wait()

    finally:
        # Capture and print any error messages from the consumer's standard error stream
        for error_line in consumer.stderr:
            print("Error:", error_line.decode().strip())


def send_message(producer, msg):
    # Publish the received message to the producer
    try:
        print(f"Publishing message: {msg}")
        producer.stdin.write(msg.encode() + b"\n")
        producer.stdin.flush()
    except Exception as e:
        print(f"Error sending message: {e}")

Get full code here https://github.com/maxcotec/aws-IAM-auth-msk-python

Watch demo here: https://youtu.be/xL5wFLF4nrk

0
On

AWS officially released aws-msk-iam-sasl-signer-python. Here an example code using aiokafka and aws-msk-iam-sasl-signer-python

aiokafka==0.10.0
aws-msk-iam-sasl-signer-python==1.0.1
import asyncio
import os, sys
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import ssl

def create_ssl_context():
    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.check_hostname = False
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.load_default_certs()

    return _ssl_context

class AWSTokenProvider(AbstractTokenProvider):
    async def token(self):
        return await asyncio.get_running_loop().run_in_executor(None, self._token)

    def _token(self):
        AWS_REGION = os.getenv('AWS_REGION')
        token, _ = MSKAuthTokenProvider.generate_auth_token(AWS_REGION)
        return token

async def consume():
    KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'demo')
    KAFKA_GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'demo-group')
    KAFKA_SERVER = os.getenv('KAFKA_SERVER', 'b-1.mykafka.abcdef.c5.kafka.us-west-2.amazonaws.com:9098')

    consumer = AIOKafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_SERVER,
        group_id=KAFKA_GROUP_ID,
        security_protocol='SASL_SSL',
        ssl_context=create_ssl_context(),
        sasl_mechanism="OAUTHBEARER",
        sasl_oauth_token_provider=AWSTokenProvider()
    )

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()


def main():
    try:
        asyncio.run(consume())
    except KeyboardInterrupt:
        print("Bye!")
        sys.exit(0)


if __name__ == "__main__":
    main()