Confluent Kafka connection from Databricks

49 Views Asked by At

I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors. Now, if I try to list out the topics in Kafka cluster using the following (with correct credentials and topic name):

from confluent_kafka.admin import AdminClient

def get_kafka_topics():
    # Create AdminClient with SASL configuration
    admin_conf = {
        'bootstrap.servers': bootstrap_servers,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': username ,
        'sasl.password': password 
        
    }
    admin_client = AdminClient(admin_conf)

    try:
        # Fetch the list of topics
        topics_metadata = admin_client.list_topics(timeout=10)
        topics = topics_metadata.topics
        print("List of Kafka topics:")
        for topic in topics:
            print(topic)
    except Exception as e:
        print(f"Failed to get the list of Kafka topics: {str(e)}")

This gives me the list of topics. When I try to consume messages from the topic using:

from confluent_kafka import Consumer, KafkaError

# Initialize the Kafka consumer with SASL_SSL authentication
consumer = Consumer({
    'bootstrap.servers': bootstrap_servers,
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': sasl_username,
    'sasl.password': sasl_password,
    'group.id': 'new_group',
    'auto.offset.reset': 'earliest'
})

# Subscribe to the Kafka topic
consumer.subscribe([topic])

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f'Error while consuming: {msg.error()}')
        else:
            # Parse the received message
            value = msg.value().decode('utf-8')
            print(value)

except KeyboardInterrupt:
    pass
finally:
    # Close the consumer gracefully
    consumer.close()

This just keeps running and does not give out any result.

Using the kafka-python client:

from kafka import KafkaConsumer

# Kafka consumer configuration
consumer_conf = {
    'bootstrap_servers': bootstrap_servers,
    # 'group_id': 'your_consumer_group_id',  # Specify your consumer group ID
    'auto_offset_reset': 'earliest',  # Start consuming from the beginning of the topic
    'enable_auto_commit': True,  # Enable automatic committing of offsets
}

# If SASL is enabled, add SASL configuration
if kafka_username and kafka_password:
    consumer_conf.update({
        'security_protocol': 'SASL_SSL',
        'sasl_mechanism': 'PLAIN',
        'sasl_plain_username': kafka_username,
        'sasl_plain_password': kafka_password,
    })

# Create Kafka consumer
consumer = KafkaConsumer('Prod.DL.CoreLogic.Deed.SparkETL.main', **consumer_conf)

for message in consumer:
    print(f"Received message: {message.value}")

Above gives me the following error(dummy url and port):

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=5 host=something:1111 <connecting> [IPv4 ('1.1.1.1', 1111)]> returned error 110. Disconnecting.
WARNING:kafka.client:Node 5 connection failed -- refreshing metadata

I have check whether the connection is open as well(dummy conn details):

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('url',1111))
if result == 0:
   print("Port is open")
else:
   print("Port is not open")
sock.close()

This outputs: Port is open

What could be the reason for the producer/consumer to not work when the network connection is getting established?

0

There are 0 best solutions below