Is it possible to call Bunny::Exchange publish from Delayed Job?

423 Views Asked by At

I have a Rails app that sometimes publishes messages to a RabbitMQ queue, using the gem "Bunny". Here's the setup:

# config/initializers/bunny.rb
$mq_connection = Bunny.new
$mq_connection.start
$mq_channel = $mq_connection.create_channel

Anywhere in the app I can then call:

exchange = $mq_channel.default_exchange
exchange.publish(msg.to_json, persistent: true, routing_key: '...')

This works great if I call it from the app, or from the console, but it doesen't work if it's called from a DelayedJob job. No exception is raised, but the message is just not sent.

Trying with a singleton:

It looked like global variables like $mq_channel couldn't be found by DelayedJob, so I created a singleton model to store it:

class RabbitMq
  include Singleton

  attr_accessor :connection, :channel

  def exchange
    channel.default_exchange
  end

  def setup
    self.connection = Bunny.new
    self.connection.start
    self.channel = connection.create_channel
  end

end

And I call the setup from my initializer:

# config/initializers/bunny.rb
RabbitMq.instance.setup

But that doesn't work as well. The job terminates without error, but nothing is published.

Any idea how to do that? It should be quite common to publish messages to RabbitMQ from a background worker like DJ.

1

There are 1 best solutions below

1
On BEST ANSWER

Here's how I do it:

class Messaging::Publisher

  class << self

    def publish(message)
      new(message).publish
    end

  end # Class Methods

  #=========================================================================
  # Instance Methods      
  #=========================================================================

    def initialize(message)
      @message = message
    end

    def publish
      connection = Bunny.new(ENV['CLOUDAMQP_URL'])
      connection.start
      channel = connection.create_channel
      queue_name = "#{ENV['app_name']}.#{message.keys.first.to_s.pluralize}_queue"
      queue = channel.queue(queue_name, durable: true)
      channel.default_exchange.publish(message.to_json, :routing_key => queue.name)
      channel.close
      connection.stop
      true
    end

  private

    def message()   @message    end

end

I call this both from within my app (synchronous) and from background jobs (asynchronous). Something like this:

class ServiceRequests::CreateManager < ServiceRequests::ManagerBase

  class << self

  private

  end # Class Methods

  #=========================================================================
  # Instance Methods
  #=========================================================================

    def manage
      Messaging::Publisher.publish service_request_message
    end

  private

    def service_request_message
      {
        service_request: {
          provider: {
            name: "Foo::Bar"
          },
          params: {
            baz: 'qux'
          }
        }
      }
    end

end