I have an Airflow DAG with a BashOperator that runs a Kafka producer, generating a random count of messages. These messages are consumed by a Kafka consumer, which writes them to a JSON file. However, I want the consumer to stop gracefully after processing a specific count of messages without causing an error in the Airflow DAG.
I've considered using a timeout, but I'd prefer a cleaner solution. Is there a recommended way to achieve this? I want the DAG to proceed to the next step without errors after the consumer has processed the initial set of messages.
Any help or guidance on the best approach for this scenario would be greatly appreciated!
My producer code is:
import time
import json
import random
from datetime import datetime
from kafka import KafkaProducer
def generate_data() -> dict:
id_val = random.randint(1,1000000)
cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
platform_type = random.choice(['ios','android','web','mobile-web'])
messages = {
'id': id_val,
'cur_timestamp': cur_timestamp,
'type': platform_type
}
return messages
def serializer(messages):
return json.dumps(messages).encode('utf-8')
topic = "new_topic"
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=serializer,
api_version=(0,11,5)
)
record_cnt = random.randint(10,100)
def produce_msg():
for _ in range(record_cnt):
send_msg = generate_data()
producer.send(topic, send_msg)
print(f'Producing message {str(send_msg)}')
producer.flush()
producer.close()
produce_msg()
print('produce finished')
My Consumer Code is :
import json
from kafka import KafkaConsumer
topic = "new_topic"
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')
def consumer_to_json():
with open(json_path, 'w') as json_file:
for send_msg in consumer:
message_value = send_msg.value.decode("utf-8")
json_data = json.loads(message_value)
print(json_data)
json.dump(json_data, json_file)
json_file.write('\n')
consumer.close()
print ('finish')
if __name__ == "__main__":
consumer_to_json()
You could just count them?
Worth pointing out that Kafka Connect can already write to line-delimited JSON files, and will continuously run. Otherwise, writing to a searchable system like Mongo / Elasticsearch / RDBMS JDBC database (since you need one anyway for Airflow) might be better option than just plain files.