How to run unit test with in memory Kombu with an infinite loop running drain events?

157 Views Asked by At

Hi I am new to Python & I got issue with unit test involving Kombu & inifinite while loop.

I use Python 3.9 with the latest Kombu library & PyTest.

I got a microservice class which has private Queue, Consumer attributes. It also has a public method called start, which runs an infinite while loop calling drain_events to pool for incoming messages from RabbitMq.

The Consumer has a callback method which receives an incoming message from a queue called test-queue, then publishes another message to topic called processed-message

When I use in-memory connection in unit test, after I call start method from the microservice class, the unit test does not continue as the thread blocks at drain_events method.

  1. How do I write unit test with a microservice running an infinite loop with drain-events?
  2. How do I check whether there is a message published to topic processed-message when in-memory connection is used in unit test?
  3. How do I put a message in test-queue in unit test if the microservice class does not expose Queue, Consumer attribute?
0

There are 0 best solutions below