For kafka I'm using getmany to read the consumer messages. In the total of 650 messages(which will take around 3days to process), processing happens for around 100-150 records(sometimes 12hrs or sometimes 24hrs) and then there is no further processing happening. But the consumer stream is not closed, when I put a new message it is processing, but couldn't figure out why it happens.
I thought that may be some messages are getting skipped so to check that I tried to produce only 25 messages - all processed(3hrs) then produced 100 messages - all processed(12hrs), in these situations messages are not getting skipped. Only when many no of messages(650) are put this issue happens.
Code snippet :
async def consume(loop,lock):
logger.info('Inside Consume')
consumer = AIOKafkaConsumer(KAFKA_TOPIC,
loop=loop,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=enable_auto_commit,
auto_commit_interval_ms=auto_commit_interval_ms,
auto_offset_reset=auto_offset_reset,
max_poll_records= 1,
max_poll_interval_ms=1500000,
rebalance_timeout_ms=1500000)
await consumer.start()
while True:
result = await consumer.getmany(timeout_ms=1500000, max_records=1)
for tp, msg in result.items():
if msg:
message = msg[0].value.decode()
message_json = json.loads(message)
data_encoded = message_json['payload']
if data_encoded.get('content') is not None:
msg = data_encoded['content']
message_dict_dum = base64.b64decode(msg)
message_dict = json.loads(message_dict_dum)
if message_dict.get("key1") is not None:
if message_dict["key1"] == "something":
logger.info('Hurray new message to process')
logger.info(message_dict["id"])
await process(data_encoded['content'],lock)
async def process(msg, lock):
async with lock:
logger.info('processing... ->{}'.format(msg))
await processmessage(msg)
logger.info('done processing')
if __name__ == "__main__":
logger.info('Inside main')
loop = asyncio.new_event_loop()
lock = asyncio.Lock(loop=loop)
loop.create_task(consume(loop, lock))
loop.run_forever()