unable to consume asynchronous msg from producer & consumer

181 Views Asked by At

Kafka , zookeeper is running successfully

This is my producer.py

async def publish():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
    enable_idempotence=True)  
    await producer.start()

    consumer = AIOKafkaConsumer(
    topicAKG,
    bootstrap_servers='localhost:9092',group_id='test',
    max_poll_interval_ms=60000,
    max_poll_records=50)
    await consumer.start()

    try:
        for i in range(1, 6):
            await producer.send_and_wait(topic, value='from producer'.encode())
            print(f"Iteration: {i}")
            async for message in consumer:
                print("Received ========== ", message.value.decode())
                await consumer.commit()
    finally:
        await producer.stop()
        await consumer.stop()

This is my consumer.py

import asyncio from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

topic = 'app'
topicAKG = 'back'

async def consume():
    consumer = AIOKafkaConsumer(topic, bootstrap_servers='localhost:9092',
    group_id="test",
    max_poll_interval_ms=60000,
    max_poll_records=50)
    await consumer.start()

    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
                                enable_idempotence=True)
    await producer.start()

    try:
        async for message in consumer:
            print("Received",message.value.decode())
            await asyncio.sleep(2)  # Delay for 3 seconds
            await consumer.commit()  # Commit the offset to avoid re-consuming the same message
            await producer.send_and_wait(topicAKG, value='from consumer'.encode())
    finally:
        await consumer.stop()
        await producer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(consume())

output from producer

Iteration: 1

Received ========== from consumer

output from consumer

Received from producer

so it stuck on iteration 1 and hangup

1

There are 1 best solutions below

0
On

use break in producer.out of the loop after receiving the first message

 try:
        for i in range(1, 6):
            await producer.send_and_wait(topic, value='from producer'.encode())
            print(f"Iteration: {i}")
            async for message in consumer:
                print("Received ========== ", message.value.decode())
                await consumer.commit()
                break

modify as per your producer.py