Limit tasks dequeued per second for rabbitmq

364 Views Asked by At

I have a queue in RabbitMQ in the consumer-producer fashion, which works properly as a basic round robin queue.

My issue is I am trying to limit the number of requests that are processed per second because when I dequeue an item, I make a request to a DO space that will block my IP if I make 750 requests or more in a second. I use goroutines to concurrently dequeue items, but I only want to dequeue 500 items at a time per second to avoid hitting that limit. This needs to factor in items that are currently being dequeued (i.e I can't just pull 500 items from the queue then delay until the next second), basically before it runs the dequeue code, it needs to wait to be sure that there are not already over 500 requests being dequeued within that second. I have this code so far, but it doesn't seem to be working properly (Note I am testing with 2 requests per second instead of 500 for now). It will have very long delays (like 20+ seconds) every once in a while and I am not sure it is calculating the limit properly. Note that I am pretty sure the prefetch option is not what I need here because that limits the number of messages coming in per second, here I just want to limit the messages being dequeued concurrently per second.

import (
   "os"
   "fmt"
   "github.com/streadway/amqp"
   "golang.org/x/time/rate"
   "context"
)


// Rate-limit => 2 req/s
const (
   workers = 2
)

func failOnErrorWorker(err error, msg string) {
   if err != nil {
       fmt.Println(msg)
       fmt.Println(err)
   }
}

func main() {
   // Get the env variables for the queue name and connection string
   queueName := os.Getenv("QUEUE_NAME")
   connectionString := os.Getenv("CONNECTION_STRING")

   // Set up rate limiter and context
   limiter := rate.NewLimiter(2, 1)
   ctx := context.Background()

   // Connect to the rabbitmq instance
   conn, err := amqp.Dial(connectionString)
   failOnErrorWorker(err, "Failed to connect to RabbitMQ")
   defer conn.Close()

   // Open a channel for the queue
   ch, err := conn.Channel()
   failOnErrorWorker(err, "Failed to open a channel")
   defer ch.Close()

   // Consume the messages from this queue
   msgs, err := ch.Consume(
       queueName, // queue
       "",     // consumer
       false,  // auto-ack
       false,  // exclusive
       false,  // no-local
       false,  // no-wait
       nil,    // args
   )
   failOnErrorWorker(err, "Failed to register a consumer")

   forever := make(chan bool)

   go func() {
       for d := range msgs {
           // Wait until there are less than 2 workers per second
           limiter.Wait(ctx)
           go func() {
               // Dequeue the item and acknowledge the message
               DeQueue(d.Body)
               d.Ack(false)
           } ()
       }
   }()

   fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
   // Continually run the worker
   <-forever
}
0

There are 0 best solutions below