How to improve simple connection pool to be less blocking

95 Views Asked by At

I have this simple connection pool, which seems to work fine, but it will block all readers of the method if there is a disconnect for any of the connections:

type RabbitMQConnectionPool struct {
    mu          sync.Mutex
    counter     int
    connections []*amqp.Connection
    connString  string
}


func (p *RabbitMQConnectionPool) GetConnection() (*amqp.Connection, int) {
    // Get the next connection index

    p.mu.Lock()   // blocks here until***
    defer func() {
        p.mu.Unlock()
    }()

    for {

        var size = len(p.connections)
        p.counter = p.counter + 1

        if p.counter > size-1 {
            // no modulus, just logic
            p.counter = 0
        }

        if p.connections[p.counter] != nil {
            // check for nil just in case fml
            if !p.connections[p.counter].IsClosed() {
                // if conn is still alive, return it
                return p.connections[p.counter], p.counter
            }
        }

        // otherwise, create a new conn
        conn, err := amqp.Dial(p.connString)
        if err != nil {
            vibelog.Stdout.Error("67eea0ee-ae1a-436b-8490-c6f232e6e033", err)
            continue
        }
        p.connections[p.counter] = conn
        return conn, p.counter   // blocks until here***
    }

}

is there a way way to re-vamp this so that perhaps it will block separately for each item not block for all?

1

There are 1 best solutions below

0
On

To reduce blocking, use a unique mutex for each pool element:

type RabbitMQConnectionPool struct {
    mu          sync.Mutex
    counter     int
    connections []muconn
    connString  string
}

type muconn struct {
    mu   sync.Mutex
    connection *amqp.Connection
}

func (p *RabbitMQConnectionPool) GetConnection() (*amqp.Connection, int) {
    for {
        p.mu.Lock()
        p.counter = (p.counter + 1) % len(p.connections)
        counter := p.counter
        p.mu.Unlock

        c := &p.connections[counter]
        c.mu.Lock()
        defer c.mu.Unlock()

        if c.connection != nil && !c.connetion.IsClosed() {
            return c.connection, counter
        }

        // otherwise, create a new conn
        connection, err := amqp.Dial(p.connString)
        if err != nil {
            vibelog.Stdout.Error("67eea0ee-ae1a-436b-8490-c6f232e6e033", err)
            continue
        }
        c.connection = connection
        return connection, counter
    }
}

The code in the question dials repeatedly in a tight loop when the server is down. The code in this answer replicates that problem.