How to detect an exchange does not exist with py-amqp

1.8k Views Asked by At

I want to be able to detect whether an exchange does not exist when submitting a message to AMQP.

Consider following example.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)

This script keeps publishing to the exchange but does not raise any errors if the exchange does not exist. When the exchange exists, the messages arrive.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    outgoing.wait()
    sleep(1)

When I add outgoing.wait() a amqp.exceptions.NotFound is raised which is what I want. The problem is however that if in this case the exchange exists, the message arrives but outgoing.wait() blocks my loop. (I could run outgoing.wait() in a separate thread but that I do not want to do.)

How to deal with this?

Any advice tips pointers welcome

Thanks,

Jay

2

There are 2 best solutions below

0
On

Unfortunately you need a blocking call to check for exceptions from basic_publish(). What you can do, however, is run the blocking call once before entering the asynchronous loop:

# send a test message synchronously to see if the exchange exists
test_message = amqp.Message('exchange_test')
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar")
try:    
    outgoing.wait()
except amqp.exceptions.NotFound:
    # could not find the exchange, so do something about it
    exit()

while True:
    # fairly certain the exchange exists now, run the async loop
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)
0
On

If you want to find out if an exchange exists, use the exchange_declare method and set the passive flag to True. Setting the passive flag to True will prevent the server from attempting to create the exchange and instead throw an error if the exchange does not exist.

import amqp
from amqp.exceptions import NotFound

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
try:
    outgoing.exchange_declare("fubar", "", passive=True)
except NotFound:
    print "Exchange 'fubar' does not exist!"

If what you're really interested in is ensuring that the exchange exists before publishing to it, just declare it before you drop into your sending loop. If the exchange already exists nothing will happen. If the exchange does not exist, it will be created.

import amqp

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
outgoing.exchange_declare("fubar", "direct")

Here's a link to the method declaration for exchange_declare in the amqp library you're using:

https://github.com/celery/py-amqp/blob/master/amqp/channel.py#L460-L461