Hi I am new to Python & I got issue with unit test involving Kombu & inifinite while loop.
I use Python 3.9 with the latest Kombu library & PyTest.
I got a microservice class which has private Queue, Consumer attributes. It also has a public method called start
, which runs an infinite while loop calling drain_events
to pool for incoming messages from RabbitMq.
The Consumer has a callback method which receives an incoming message from a queue called test-queue
, then publishes another message to topic called processed-message
When I use in-memory connection in unit test, after I call start
method from the microservice class, the unit test does not continue as the thread blocks at drain_events
method.
- How do I write unit test with a microservice running an infinite loop with drain-events?
- How do I check whether there is a message published to topic
processed-message
when in-memory connection is used in unit test? - How do I put a message in
test-queue
in unit test if the microservice class does not expose Queue, Consumer attribute?