Python Kafka Client - No error but not working

4.1k Views Asked by At

I am running the confluent_kafka client in python. Currently I get no errors when trying to produce and then consume messages, but the problem is the producer says it succeeds, but the consumer can't find any messages.

I have created a topic and this is the class I built that I am using:

from confluent_kafka import Producer, Consumer
from config import config
import json

class Kafka:
    """
    Kafka Handler.
    """

    def __init__(self, kafka_brokers_sasl, api_key):
        """
        Arguments:
            kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)
            api_key {str} -- Kafka Api Key
        """

        self.driver_options = {
            'bootstrap.servers': kafka_brokers_sasl,
            'sasl.mechanisms': 'PLAIN',
            'security.protocol': 'SASL_SSL',
            'sasl.username': 'token',
            'sasl.password': api_key,
            'log.connection.close' : False,
            #'debug': 'all'
        }

        self.producer_options = {
            'client.id': 'kafka-python-console-sample-producer'
        }
        self.producer_options.update(self.driver_options)

        self.consumer_options = {
            'client.id': 'kafka-python-console-sample-consumer',
            'group.id': 'kafka-python-console-sample-group'
        }
        self.consumer_options.update(self.driver_options)

        self.running = None


    def stop(self):
        self.running = False


    def delivery_report(self, err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


    def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic

        p = Producer(self.producer_options)

        print("Running?")

        # Asynchronously produce a message, the delivery report callback will be triggered from poll() above, or flush() below, when the message has been successfully delivered or failed permanently.
        p.produce(topic, data, callback=self.delivery_report)

        # Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
        p.flush()
        print("Done?")


    def consume(self, topic, method_class=None): # Function for consuming/reading data from a Kafka topic. Works as a listener and triggers the run() function on a method_class
        print("raaa")

        kafka_consumer = Consumer(self.consumer_options)

        kafka_consumer.subscribe([topic])

        # Now loop on the consumer to read messages
        print("Running?")
        self.running = True
        while self.running:
            msg = kafka_consumer.poll()

            print(msg)

            if msg is not None and msg.error() is None:
                print('Message consumed: topic={0}, partition={1}, offset={2}, key={3}, value={4}'.format(
                    msg.topic(),
                    msg.partition(),
                    msg.offset(),
                    msg.key().decode('utf-8'),
                    msg.value().decode('utf-8')))
            else:
                print('No messages consumed')

        print("Here?")
        kafka_consumer.unsubscribe()
        kafka_consumer.close()
        print("Ending?")

mock = {'yas': 'yas', 'yas2': 'yas2'}
kafka = Kafka(config['kafka']['kafka_brokers_sasl'], config['kafka']['api_key'])
kafka.produce(config['kafka']['topic'], json.dumps(mock))
kafka.consume(config['kafka']['topic'])

Running this I get the prints:

Running?
Message delivered to DANIEL_TEST [0]
Done?
raaa
Running?
<cimpl.Message object at 0x104e4c390>
No messages consumed
2

There are 2 best solutions below

1
On

I'm not an expert in python but it looks like you start your consumer after you've already produced the message?

kafka.produce(config['kafka']['topic'], json.dumps(mock)) kafka.consume(config['kafka']['topic'])

You need to call the consume function before calling the produce function because when you start a new consumer, the default offset for that consumer would be latest. So for example, if you have produced a message at offset 5 and then start a new consumer, your consumer offset would be at offset 6 by default and it would not consume your message produced at offset 5.

The solution is to either start consuming before producing anything or set the consumer config to consume messages from the beginning of the offset. This can be done by setting auto.offset.reset to earliest but I think the first solution is simpler.

0
On

I had the same problem. The driver_options must include the SSL certificate path, so you must set the 'ssl.ca.location': '/etc/pki/tls/cert.pem' Or the equivalent location as documented here: https://github.com/ibm-messaging/event-streams-samples/blob/master/kafka-python-console-sample/app.py#L75

Then it worked!