I am trying to connect to confluent-kafka from Databricks and produce/consume messages. Right now I am just experimenting with the python client and not the spark connectors. Now, if I try to list out the topics in Kafka cluster using the following (with correct credentials and topic name):
from confluent_kafka.admin import AdminClient
def get_kafka_topics():
# Create AdminClient with SASL configuration
admin_conf = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username ,
'sasl.password': password
}
admin_client = AdminClient(admin_conf)
try:
# Fetch the list of topics
topics_metadata = admin_client.list_topics(timeout=10)
topics = topics_metadata.topics
print("List of Kafka topics:")
for topic in topics:
print(topic)
except Exception as e:
print(f"Failed to get the list of Kafka topics: {str(e)}")
This gives me the list of topics. When I try to consume messages from the topic using:
from confluent_kafka import Consumer, KafkaError
# Initialize the Kafka consumer with SASL_SSL authentication
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': sasl_username,
'sasl.password': sasl_password,
'group.id': 'new_group',
'auto.offset.reset': 'earliest'
})
# Subscribe to the Kafka topic
consumer.subscribe([topic])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Error while consuming: {msg.error()}')
else:
# Parse the received message
value = msg.value().decode('utf-8')
print(value)
except KeyboardInterrupt:
pass
finally:
# Close the consumer gracefully
consumer.close()
This just keeps running and does not give out any result.
Using the kafka-python client:
from kafka import KafkaConsumer
# Kafka consumer configuration
consumer_conf = {
'bootstrap_servers': bootstrap_servers,
# 'group_id': 'your_consumer_group_id', # Specify your consumer group ID
'auto_offset_reset': 'earliest', # Start consuming from the beginning of the topic
'enable_auto_commit': True, # Enable automatic committing of offsets
}
# If SASL is enabled, add SASL configuration
if kafka_username and kafka_password:
consumer_conf.update({
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': kafka_username,
'sasl_plain_password': kafka_password,
})
# Create Kafka consumer
consumer = KafkaConsumer('Prod.DL.CoreLogic.Deed.SparkETL.main', **consumer_conf)
for message in consumer:
print(f"Received message: {message.value}")
Above gives me the following error(dummy url and port):
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=5 host=something:1111 <connecting> [IPv4 ('1.1.1.1', 1111)]> returned error 110. Disconnecting.
WARNING:kafka.client:Node 5 connection failed -- refreshing metadata
I have check whether the connection is open as well(dummy conn details):
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('url',1111))
if result == 0:
print("Port is open")
else:
print("Port is not open")
sock.close()
This outputs: Port is open
What could be the reason for the producer/consumer to not work when the network connection is getting established?