Kafka consumer records - processing

167 Views Asked by At

For kafka I'm using getmany to read the consumer messages. In the total of 650 messages(which will take around 3days to process), processing happens for around 100-150 records(sometimes 12hrs or sometimes 24hrs) and then there is no further processing happening. But the consumer stream is not closed, when I put a new message it is processing, but couldn't figure out why it happens.

I thought that may be some messages are getting skipped so to check that I tried to produce only 25 messages - all processed(3hrs) then produced 100 messages - all processed(12hrs), in these situations messages are not getting skipped. Only when many no of messages(650) are put this issue happens.

Code snippet :

async def consume(loop,lock):
    logger.info('Inside Consume')

    consumer = AIOKafkaConsumer(KAFKA_TOPIC,
                        loop=loop,
                        bootstrap_servers=bootstrap_servers,
                        group_id=group_id,           
                        enable_auto_commit=enable_auto_commit,       
                        auto_commit_interval_ms=auto_commit_interval_ms,  
                        auto_offset_reset=auto_offset_reset,  
                        max_poll_records= 1,
                        max_poll_interval_ms=1500000,
                        rebalance_timeout_ms=1500000)
              
    await consumer.start()
    while True:
        result = await consumer.getmany(timeout_ms=1500000, max_records=1)
        for tp, msg in result.items(): 
            if msg:
                message = msg[0].value.decode()
                message_json = json.loads(message)
                data_encoded = message_json['payload']

                if data_encoded.get('content') is not None:
                    msg = data_encoded['content']
                    message_dict_dum = base64.b64decode(msg)
                    message_dict = json.loads(message_dict_dum)
                    
                    if message_dict.get("key1") is not None:
                        if message_dict["key1"] == "something":
                            logger.info('Hurray new message to process')
                            logger.info(message_dict["id"])
                            await process(data_encoded['content'],lock)
                            

async def process(msg, lock):
    async with lock:
        logger.info('processing... ->{}'.format(msg))
        await processmessage(msg)
        logger.info('done processing')        


if __name__ == "__main__":
    logger.info('Inside main')

    loop = asyncio.new_event_loop()
    lock = asyncio.Lock(loop=loop)

    loop.create_task(consume(loop, lock))
    loop.run_forever()
0

There are 0 best solutions below