How can I use sneakers (for rabbitMQ) to requeue a message for up to 5 failed attempts?

180 Views Asked by At

I see the old gem versions supported the ability to define these sort of things within the Sneakers.configure params back around 2015, but this is no longer the case. So im trying to find alternatives, but the sneakers wiki do not have any details, and neither can i figure out any ways for it to be done looking through the source code for sneakers.

I need my worker to be able to "know" how many failed attempts a message has been attempted in order to choose whether to requeue or to reject the message

config

Sneakers.configure(
  heartbeat: 60,
  amqp: "amqp://#{ENV['RABBITMQ_USER']}:#{ENV['RABBITMQ_PASS']}@#{ENV['RABBITMQ_HOST']}",
  daemonize: ENV['RACK_ENV'] == 'production',
  pid_path: './sneakers.pid',
  vhost: '/',
  exchange: 'sneakers',
  exchange_type: :direct,
  timeout_job_after: 2.minutes,
  workers: 1,
  properties: {
    connection_name: 'worker'
  }
)
Sneakers.logger.level = Logger::DEBUG

workers = Sneakers::Runner.new([Workers::BB])
workers.run

worker

module Workers
  ##
  class BB
    include Sneakers::Worker
    from_queue :backbone

    def work(msg)
      failed_atempts = 0 ## ! need to find out the failed attempt count

      ## ... handle message ...

      ack!
    rescue StandardError => e
      log :fatal, e.message
      log :fatal, e.backtrace
      
      requeue! if failed_attempts <= 5
      reject! if failed_attempts > 5
    end
  end
end
1

There are 1 best solutions below

0
On

currently this is the only way i've figured out how to do it, I used work_with_params instead of work, and then I manually requeue the message using publish with manually configured custom headers

class Workman
  include Sneakers::Worker

  from_queue :my_queue

  def work_with_params(msg, _delivery_info, metadata)
    max_retry = 5
    headers = metadata[:headers]
    retry_count = headers['retry_count'] || 0

    ## ...

    ack!
  rescue StandardError => e
    retry_count += 1
    headers['retry_count'] = retry_count

    if retry_count > max_retry
      ## ... do something to stash failed messages for later processing
      stash_failed_message(@queue.name, msg, metadata, e)
    end

    if retry_count <= max_retry
      publish msg, routing_key: @queue.name, persistent: true, headers: { retry_count: retry_count }
    end

    ack!
  end
end