I have a queue in RabbitMQ in the consumer-producer fashion, which works properly as a basic round robin queue.
My issue is I am trying to limit the number of requests that are processed per second because when I dequeue an item, I make a request to a DO space that will block my IP if I make 750 requests or more in a second. I use goroutines to concurrently dequeue items, but I only want to dequeue 500 items at a time per second to avoid hitting that limit. This needs to factor in items that are currently being dequeued (i.e I can't just pull 500 items from the queue then delay until the next second), basically before it runs the dequeue code, it needs to wait to be sure that there are not already over 500 requests being dequeued within that second. I have this code so far, but it doesn't seem to be working properly (Note I am testing with 2 requests per second instead of 500 for now). It will have very long delays (like 20+ seconds) every once in a while and I am not sure it is calculating the limit properly. Note that I am pretty sure the prefetch option is not what I need here because that limits the number of messages coming in per second, here I just want to limit the messages being dequeued concurrently per second.
import (
"os"
"fmt"
"github.com/streadway/amqp"
"golang.org/x/time/rate"
"context"
)
// Rate-limit => 2 req/s
const (
workers = 2
)
func failOnErrorWorker(err error, msg string) {
if err != nil {
fmt.Println(msg)
fmt.Println(err)
}
}
func main() {
// Get the env variables for the queue name and connection string
queueName := os.Getenv("QUEUE_NAME")
connectionString := os.Getenv("CONNECTION_STRING")
// Set up rate limiter and context
limiter := rate.NewLimiter(2, 1)
ctx := context.Background()
// Connect to the rabbitmq instance
conn, err := amqp.Dial(connectionString)
failOnErrorWorker(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Open a channel for the queue
ch, err := conn.Channel()
failOnErrorWorker(err, "Failed to open a channel")
defer ch.Close()
// Consume the messages from this queue
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnErrorWorker(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
// Wait until there are less than 2 workers per second
limiter.Wait(ctx)
go func() {
// Dequeue the item and acknowledge the message
DeQueue(d.Body)
d.Ack(false)
} ()
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
// Continually run the worker
<-forever
}