How to count number of records (message) in the topic using kafka-python

11.6k Views Asked by At

As said in the title, i want to get a number of record in my topic and i can't find a solution using kafka-python library. Does anyone have any idea ?

4

There are 4 best solutions below

1
On

There is no specific API to count the number of records from a topic. You need to consume and count the number of records that you received from kafka consumer.

3
On

One solution is you can add one message each to all the partition and get the last offset. From offsets you can calculate the number of total message sent till now to the topic.

But this is not the right approach. You are not aware about how many messages consumers have already consumed and how many messages have been deleted by kafka. The only way is you can consume messages and count the number.

1
On

The main idea is to count how many messages there are in each partition of the topic and sum all these numbers. The result is the total number of messages on that topic. I am using confluent_kafka as the main library.

from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor

consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})

def get_partition_size(topic_name: str, partition_key: int):
    topic_partition = TopicPartition(topic_name, partition_key)
    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
    partition_size = high_offset - low_offset
    return partition_size

def get_topic_size(topic_name: str):
    topic = consumer.list_topics(topic=topic_name)
    partitions = topic.topics[topic_name].partitions
    workers, max_workers = [], len(partitions) or 1

    with ThreadPoolExecutor(max_workers=max_workers) as e:
        for partition_key in list(topic.topics[topic_name].partitions.keys()):
            job = e.submit(get_partition_size, topic_name, partition_key)
            workers.append(job)

    topic_size = sum([w.result() for w in workers])
    return topic_size

print(get_topic_size('my.kafka.topic'))
1
On

I wasn't able to get this working with kafka-python, but I was able to do it fairly easily with confluent-kafka libraries:

from confluent_kafka import Consumer

topic = "test_topic"
broker = "localhost:9092"

def get_count():
    consumer = Consumer({
        'bootstrap.servers': broker,
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest',
    })

    consumer.subscribe([topic])

    total_message_count = 0
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            print("No more messages")
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        total_message_count = total_message_count + 1
        print('Received message {}: {}'.format(total_message_count,     
msg.value().decode('utf-8')))

    consumer.close()

    print(total_message_count)