rabbitMQ return queue is empty when i trying to get message by using outgoing AMQP from zato

3.4k Views Asked by At

I have a service that is invoked by ESB (zato) the role of this service is to publish a message in rabbitMQ through an AMQP outgoing but when i consult rabbitMQ and make get message the answer is queue is empty.this is service in zato

from zato.server.service import Service

class HelloService(Service):
    def handle(self):

        # Request parameters
        msg = 'Hello AMQP broker!'
        out_name = 'My CRM connection'
        exchange = 'My exchange'
        routing_key = ''
        properties = {'app_id': 'ESB'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)
1

There are 1 best solutions below

0
On

A full working example to consume from a rabbit queue from a zato service would be as follows:

In rabbit

  1. Create an exchange
  2. Create a queue
  3. Bind queue to the exchange
  4. Create a connection definition in zato
  5. Create an outgoing AMQP connection defintion in zato
  6. Write the zato service to publish or consume

The first three steps can be done in multiple ways, here is a simple python script you can use to do that (just install kombu, and click):

import click
import os
import sys
import settings
from kombu import Connection, Exchange, Queue


BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(user=settings.RABBIT_USER,
                                                                       password=settings.RABBIT_PASS,
                                                                       server=settings.RABBIT_SERVER,
                                                                       port=settings.RABBIT_PORT,
                                                                       vhost=settings.RABBIT_VHOST)


@click.command()
@click.option('--remove/--no-remove', default=False, help='Remove current Queues/Exchanges.')
@click.option('--create/--no-create', default=False, help='Create needed Queues/Exchanges')
def job(remove, create):
    exchanges = {'dead_letter': Exchange(name=settings.DEAD_LETTER_EXCHANGE,
                                         type=settings.DEAD_LETTER_EXCHANGE_TYPE,
                                         durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
                 'results': Exchange(name=settings.RESULTS_EXCHANGE_NAME,
                                     type=settings.RESULTS_EXCHANGE_TYPE,
                                     durable=settings.RESULTS_EXCHANGE_DURABLE)}

    queues = {'dead_letter': Queue(name=settings.DEAD_LETTER_QUEUE,
                                   exchange=exchanges['dead_letter'],
                                   routing_key=settings.DEAD_LETTER_ROUTING,
                                   durable=settings.DEAD_LETTER_EXCHANGE_DURABLE),
              'results': Queue(name=settings.RESULTS_QUEUE_NAME,
                               exchange=exchanges['results'],
                               routing_key=settings.RESULTS_QUEUE_ROUTING,
                               durable=settings.RESULTS_EXCHANGE_DURABLE),
              'task': Queue(name=settings.TASK_QUEUE_NAME,
                            exchange=exchanges['results'],
                            routing_key=settings.TASK_ROUTING_KEY,
                            queue_arguments={
                                "x-message-ttl": settings.TASK_QUEUE_TTL,
                                "x-dead-letter-exchange": settings.DEAD_LETTER_EXCHANGE,
                                "x-dead-letter-routing-key": settings.DEAD_LETTER_ROUTING})}

    print 'using broker: {}'.format(BROKER_URL)

    with Connection(BROKER_URL) as conn:
        channel = conn.channel()
        if remove:
            # remove exchanges
            for (key, exchange) in exchanges.items():
                print 'removing exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.delete()

            # remove queues
            for (key, queue) in queues.items():
                print 'removing queue {} '.format(queues[key].name)
                bound_queue = queues[key](channel)
                bound_queue.delete()

        if create:
            # create exchanges
            for (key, exchange) in exchanges.items():
                print 'creating exchange: {}'.format(exchange.name)
                bound_exchange = exchange(channel)
                bound_exchange.declare()

            # add queues
            for (key, queue) in queues.items():
                # if key in exchanges:
                print 'binding queue {} to exchange {} with routing key {}'.format(queue.name,
                                                                                   queue.exchange.name,
                                                                                   queue.routing_key)
                bound_queue = queue(channel)
                bound_queue.declare()


if __name__ == '__main__':
    job()

And the settings file:

# rabbit stuff
RABBIT_SERVER = 'localhost'
RABBIT_USER = 'guest'
RABBIT_PASS = 'guest'
RABBIT_PORT = 5672
RABBIT_VHOST = '/'

# default task queue
TASK_EXCHANGE_NAME = 'test.service.request'
TASK_EXCHANGE_TYPE = 'direct'
TASK_EXCHANGE_DURABLE = True
TASK_QUEUE_NAME = 'test.service.request'
TASK_ROUTING_KEY = 'request'
TASK_QUEUE_TTL = 604800000

# dead letter settings
DEAD_LETTER_EXCHANGE = 'test.service.deadletter'
DEAD_LETTER_EXCHANGE_TYPE = 'direct'
DEAD_LETTER_EXCHANGE_DURABLE = True
DEAD_LETTER_QUEUE = 'test.service.deadletter'
DEAD_LETTER_ROUTING = 'deadletter'

# results settings
RESULTS_EXCHANGE_NAME = 'test.service.results'
RESULTS_EXCHANGE_TYPE = 'direct'
RESULTS_EXCHANGE_DURABLE = True
RESULTS_QUEUE_NAME = 'test.service.results'
RESULTS_QUEUE_ROUTING = 'results'

Now lets create the queues running the above script on a fresh virtualenv with python 2.7:

$ virtualenv rabbit_test
New python executable in /home/ivan/rabbit_test/bin/python
Installing setuptools, pip, wheel...done.

$ source /home/ivan/rabbit_test/bin/activate

$ pip install kombu
Collecting kombu
...
$ pip install click
Collecting click
...

copy the scripts above

$ mkdir ~/rabbit_test/app
$ vi ~/rabbit_test/app/create_queues.py
$ vi ~/rabbit_test/app/settings.py

and run create_queues.py.

$ cd ~/rabbit_test/app
$ python create_queues.py --create
using broker: amqp://guest:guest@localhost:5672//
creating exchange: test.service.results
creating exchange: test.service.deadletter
binding queue test.service.request to exchange test.service.results with routing key request
binding queue test.service.results to exchange test.service.results with routing key results
binding queue test.service.deadletter to exchange test.service.deadletter with routing key deadletter

You can verify the exchanges and queues are on rabbit with the cli tool or with the management plugin:

$ rabbitmqadmin list exchanges
+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
| test.service.deadletter | direct  |
| test.service.results    | direct  |
+-------------------------+---------+

$ rabbitmqadmin list queues
+-------------------------+----------+
|          name           | messages |
+-------------------------+----------+
| test.service.deadletter | 0        |
| test.service.request    | 0        |
| test.service.results    | 0        |
+-------------------------+----------+

$ rabbitmqadmin list bindings
+-------------------------+-------------------------+-------------------------+
|         source          |       destination       |       routing_key       |
+-------------------------+-------------------------+-------------------------+
|                         | test.service.deadletter | test.service.deadletter |
|                         | test.service.request    | test.service.request    |
|                         | test.service.results    | test.service.results    |
| test.service.deadletter | test.service.deadletter | deadletter              |
| test.service.results    | test.service.request    | request                 |
| test.service.results    | test.service.results    | results                 |
+-------------------------+-------------------------+-------------------------+

Now the zato part (Steps 4,5 and 6) can be done using the public api, or the webadmin, I'll show you how to do it with the public api, but is easier to do it trough the UI as this is only done very few times.

Create AMQP Connection Definition doc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO_Test",
    "host": "127.0.0.1",
    "port": "5672",
    "vhost": "/",
    "username": "guest",
    "frame_max": 131072,
    "heartbeat": 10
}' "http://localhost:11223/zato/json/zato.definition.amqp.create"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K04DWBPMYF8A7768C7N482E75YM3"
  },
  "zato_definition_amqp_create_response": {
    "id": 2,
    "name": "SO_Test"
  }
}

Set Password for our AMQP Connection doc

$ curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw=="  -d '{
    "id": 2,
    "password1": "guest",
    "password2": "guest"
}' "http://localhost:11223/zato/json/zato.definition.amqp.change-password"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K07K9YY21XZAX4QKWJB3ZFXN2ZFT"
  }
}

Create an outgoing AMQP connection defintion doc

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
    "cluster_id": 1,
    "name": "SO Test",
    "is_active": true,
    "def_id": 2,
    "delivery_mode": 1,
    "priority": 6,
    "content_type": "application/json",
    "content_encoding": "utf-8",
    "expiration": 30000
}' "http://localhost:11223/zato/json/zato.outgoing.amqp.create"

{
  "zato_outgoing_amqp_create_response": {
    "id": 1,
    "name": "SO Test"
  },
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K05F2CR954BFNBP14KGTM26V47PC"
  }
}

Finally the service that is going to send the message

from zato.server.service import Service

class HelloService(Service):
    def handle(self):
        # Request parameters 
        msg = 'Hello AMQP broker!'
        out_name = 'SO Test'
        exchange = 'test.service.results'
        routing_key = 'request'
        properties = {'app_id': 'ESB', 'user_id': 'guest'}
        headers = {'X-Foo': 'bar'}

        # Send a message to the broker
        info = self.outgoing.amqp.send(msg, out_name, exchange, routing_key,
            properties, headers)
        self.logger.info(info)

If you are going to use the properties user_id it must match the connection user_id or else the request will fail.

Also please note that here I've created a deadletter exchange and the message will be sent here after 30 seconds if its still in the test.service.request queue

The final step is to test

To verify that the message is delivered to our queue, we can create an http/soap channel or invoke the service directly, I'm doing the latter using the public api.

curl -X POST -H "Authorization: Basic cHViYXBpOjEyMw==" -d '{
   "name": "test.hello-service",
   "data_format": "json"
}' "http://localhost:11223/zato/json/zato.service.invoke"

{
  "zato_env": {
    "details": "",
    "result": "ZATO_OK",
    "cid": "K050J64QQ8FXASXHKVCAQNC4JC4N"
  },
  "zato_service_invoke_response": {
    "response": ""
  }
}

and after that we check the queue for the message we just sent:

$ rabbitmqadmin get queue=test.service.request requeue=true
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| routing_key |       exchange       | message_count |      payload       | payload_bytes | payload_encoding | redelivered |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+
| request     | test.service.results | 0             | Hello AMQP broker! | 18            | string           | False       |
+-------------+----------------------+---------------+--------------------+---------------+------------------+-------------+

Remember to check the rabbit and zato server logs in case you still have any problem.