How to gracefully stop Kafka consumer after processing a specific number of messages in Python?

176 Views Asked by At

I have an Airflow DAG with a BashOperator that runs a Kafka producer, generating a random count of messages. These messages are consumed by a Kafka consumer, which writes them to a JSON file. However, I want the consumer to stop gracefully after processing a specific count of messages without causing an error in the Airflow DAG.

I've considered using a timeout, but I'd prefer a cleaner solution. Is there a recommended way to achieve this? I want the DAG to proceed to the next step without errors after the consumer has processed the initial set of messages.

Any help or guidance on the best approach for this scenario would be greatly appreciated!

My producer code is:

import time 
import json 
import random 
from datetime import datetime
from kafka import KafkaProducer


def generate_data() -> dict: 
    
    id_val = random.randint(1,1000000) 
    cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") 
    platform_type = random.choice(['ios','android','web','mobile-web']) 
    messages = {
        'id': id_val,
        'cur_timestamp': cur_timestamp,
        'type': platform_type
    }
    return messages


def serializer(messages):
    return json.dumps(messages).encode('utf-8') 


topic = "new_topic" 
producer = KafkaProducer( 
    bootstrap_servers=['localhost:9092'],
        value_serializer=serializer,
    api_version=(0,11,5)
) 

record_cnt = random.randint(10,100)
def produce_msg():
    for _ in range(record_cnt):
        send_msg = generate_data()
        producer.send(topic, send_msg)
        print(f'Producing message {str(send_msg)}')
    producer.flush()    
    producer.close()

                
produce_msg() 

print('produce finished')

My Consumer Code is :

import json 
from kafka import KafkaConsumer

topic = "new_topic" 
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')

def consumer_to_json():
    with open(json_path, 'w') as json_file:
        for send_msg in consumer:
            message_value = send_msg.value.decode("utf-8")
     
            json_data = json.loads(message_value)
            print(json_data)
            json.dump(json_data, json_file)
            json_file.write('\n')
        consumer.close()
        print ('finish')  


if __name__ == "__main__":
    consumer_to_json()
1

There are 1 best solutions below

0
On

You could just count them?

num_messages = 0
for send_msg in consumer:
    message_value = send_msg.value.decode("utf-8")
    json_data = json.loads(message_value)
    print(json_data)
    json.dump(json_data, json_file)
    json_file.write('\n')
    num_messages += 1
    if num_messages >= MESSAGES_LIMIT:  # TODO: define       
        break
consumer.close()

Worth pointing out that Kafka Connect can already write to line-delimited JSON files, and will continuously run. Otherwise, writing to a searchable system like Mongo / Elasticsearch / RDBMS JDBC database (since you need one anyway for Airflow) might be better option than just plain files.