Confluent Kafka Python Library blocking asyncio consumer poll

107 Views Asked by At

I have a function which I call via asyncio.create_task()

async def runthis():
    c = Consumer({
        'bootstrap.servers': 'server:9092',
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        'message.max.bytes':500000,
        'fetch.wait.max.ms':1000,
        'fetch.min.bytes':500000,
        'fetch.max.bytes':500000
    })

    c.subscribe(['queuestore'])
    print("Started")
    while True:
        msgs = c.poll(timeout=2000)

Unfortunately, I do not get any messages in this. Is there any issue with usage of asyncio for confluent kafka python library. It works well with usage of aiokafka.

1

There are 1 best solutions below

0
On

Well, you could complete your example with the imports and minimal boiler plate so we can check what each element is.

But from the snippet as presented, the Consumer.poll call is done synchronously, so it will block the asyncio loop. Check if there are async versions of these kafka methods to be called, otherwise, they should be called with asyncio.run_in_executor(...) and properly awaited.