I have a function which I call via asyncio.create_task()
async def runthis():
c = Consumer({
'bootstrap.servers': 'server:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
'message.max.bytes':500000,
'fetch.wait.max.ms':1000,
'fetch.min.bytes':500000,
'fetch.max.bytes':500000
})
c.subscribe(['queuestore'])
print("Started")
while True:
msgs = c.poll(timeout=2000)
Unfortunately, I do not get any messages in this. Is there any issue with usage of asyncio for confluent kafka python library. It works well with usage of aiokafka.
Well, you could complete your example with the imports and minimal boiler plate so we can check what each element is.
But from the snippet as presented, the
Consumer.poll
call is done synchronously, so it will block the asyncio loop. Check if there are async versions of these kafka methods to be called, otherwise, they should be called withasyncio.run_in_executor(...)
and properly awaited.