aiokafka exits when running in a multiprocessing class

188 Views Asked by At

I've been banging my head against the wall today trying to figure out why this isn't working. I created this multiprocessing class:

class Consumer(multiprocessing.Process):
def __init__(self, topic, **kwargs):
    self.topic = topic
    super(Consumer, self).__init__(**kwargs)

def _deserializer(serialized):
    return json.loads(serialized)

async def _consume(self):
    consumer = AIOKafkaConsumer(
        self.topic,
        # group_id=None,
        group_id="Deployment",
        value_deserializer=self._deserializer,
        bootstrap_servers='localhost:30322',
    )

    await consumer.start()

    tasks = []
    try:
        async for msg in consumer:
            logging.info("***** reading message *****")
            tasks.append(asyncio.create_task(process_msg(msg, 1)))
    finally:
        await consumer.stop()
    await asyncio.gather(*tasks)

def run(self):
    asyncio.run(self._consume())

And my main file does this:

num_procs = 1
processes = [Consumer("deployment_requests") for _ in range(num_procs)]

for p in processes:
    p.start()

for p in processes:
    logging.info(f'pid is {p.pid}')

for p in processes:
    p.join()
    logging.info(f'pid is {p.pid}')

And the output

2023-05-02 16:03:58 - INFO    - pid is 23520
2023-05-02 16:04:03 - INFO    - Updating subscribed topics to: frozenset({'deployment_requests'})
2023-05-02 16:04:03 - INFO    - Discovered coordinator 0 for group Deployment
2023-05-02 16:04:03 - INFO    - Revoking previously assigned partitions set() for group Deployment
2023-05-02 16:04:03 - INFO    - (Re-)joining group Deployment
2023-05-02 16:04:03 - INFO    - Joined group 'Deployment' (generation 182) with member_id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838
2023-05-02 16:04:03 - INFO    - Elected group leader -- performing partition assignments using roundrobin
2023-05-02 16:04:03 - INFO    - Successfully synced group Deployment with generation 182
2023-05-02 16:04:03 - INFO    - Setting newly assigned partitions {TopicPartition(topic='deployment_requests', partition=0)} for group Deployment
2023-05-02 16:04:03 - INFO    - LeaveGroup request succeeded
Process Consumer-1:
2023-05-02 16:04:04 - INFO    - pid is 23520

If I take the code out of the class, this code works as expected. But as is, it never even prints "***** reading message *****" so it's not even waiting on messages. So I think it has something to do with p.start() not using the run() method correctly for the asyncio call. But it could also be something completely different :)

Here's the producer logs, but there aren't any issues on the producer side.

[2023-05-02 21:04:03,943] INFO [GroupCoordinator 0]: Stabilized group Deployment generation 182 (__consumer_offsets-15) (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,947] INFO [GroupCoordinator 0]: Assignment received from leader for group Deployment for generation 182 (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838] in group Deployment has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2023-05-02 21:04:03,966] INFO [GroupCoordinator 0]: Preparing to rebalance group Deployment in state PreparingRebalance with old generation 182 (__consumer_offsets-15) (reason: removing member aiokafka-0.8.0-e331a252-b6cd-4521-9140-6bb70cf9e838 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
1

There are 1 best solutions below

0
On

Finally figured it out. When I turned the consumer into a class, the deserializer broke it. Once I removed the deserializer, it started working as expected.