Celery + reverse topic exchange (x-rtopic) not working

354 Views Asked by At

I am trying to create a celery app using reverse topic exchange rabbitmq plugin from Alvaro Videla. The workers seems to connect with the broker fine using this exchange but when I topic-reverse-route my task, do not pick up the '#' or '*', works like direct exchange.

so thats my queue:

Queue(name='cluster', 
          exchange = Exchange(name='cluster', 
                              type='x-rtopic',
                              delivery_mode='persistent',
                              durable=True), 
          routing_key='intel.%d.%s' % (n_cores, hostname),
          durable = True,)

Now picture 2 workers using the following routing_key

  • Worker1 : intel.8.host1
  • Worker2 : amd.2.host2

Thats the routing_keys on the tasks I am trying and what I experienced:

Routing key     | Works?   |  Result              | Expected
-------------------------------------------------------------------------
'intel'         | OK       | Nobody receives      | 
'intel.*'       | OK       | Nobody receives      |
'intel.#'       | WRONG    | Everyone receives    | just Worker1 receives
'#.host1'       | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.*      | WRONG    | Everyone receives    | just Worker1 receives
'intel.*.host1  | WRONG    | Everyone receives    | just Worker1 receives
'*.2.*'         | WRONG    | Everyone receives    | just Worker2 receives
'intel.8.host1' | OK       | like direct exchange | 

To try identify where was the problem, I've tested the plugin doing simple messaging using pika and just kombu as well and both worked fine, exactly as expected. So I figured must be a problem with the way Celery is exchanging (routing) the messages. Maybe I should create a custom routing class!?

Thanks in advance.

1

There are 1 best solutions below

0
On

After dig a while I found out that the Reverse topic exchange plugin works fine with Celery. I was misinterpreting the way how the Rabbitmq Queues works. To fully make it work I had to define a My Router where the task is routed to the exchange containing those queues, and only specifying the routing_key and the exchange name, that way the tasks would still round-robin the nodes connected to that exchange and be able to use wildcards on the tasks routing key.

So the queue settings would be something like this:

routed_queue = 'intel.8.pchost'

CELERY_QUEUES = (

    Queue(name='cluster.%s' % routed_queue,
          exchange = Exchange(name='cluster',
                              type='x-rtopic'), 
          routing_key=routed_queue),)

The router would be something like this:

class MyRouter(object):

def route_for_task(self, task, args=[], kwargs={}):

    routing_key = kwargs['routing_key'] if kwargs.has_key('routing_key') and\
                  kwargs['routing_key'] else '#'

    return {'exchange': 'cluster',
            'exchange_type': 'rtopic',
            'routing_key': routing_key}

Then I would pass the routing_key as a kwargs for the task, been able to set in the task "intel.#", meaning this task would be executed by any worker with queue starting with intel.

The only gotcha! here is that for some reason I had to execute the tasks using .apply_async rather then .delay.

The whole idea is to be able to route my tasks accordingly with the machine specs available in the cluster. Some tasks should only run on intel processors and others only amd, or define by number of cores in the node or using the hostnames.

Hope this can help anyone trying to do the same in the future.