Retry connection in case of pgbouncer client login timeout error for gevent celery worker

21 Views Asked by At

Platform: GNU/Linux Celery, gevent(24.2.1), sqlalchemy, psycogreen, psycopg2, pgbouncer, postgresql

I am getting occasional client login timeout error in celery workers, which works fine after retries. I want a generic way to handle retry such client errors. Connection happens via sqlalchemy ORM and core.

I tried using gevent_wait_callback to retry connection as follows

from psycogreen.gevent import gevent_wait_callback
from psycopg2 import extensions

def internal_gevent_wait_callback(conn, timeout=None):
    """A wait callback to retry in case of connection failures."""
    retries = 0
    while True:
        try:
            return gevent_wait_callback(conn, timeout=timeout

        except Exception as e:
            log.error(traceback.format_exc())

            exponential_backoff_wait(retries, e)
            retries += 1

extensions.set_wait_callback(internal_gevent_wait_callback)

def exponential_backoff_wait(retries, e):
   ...

To simulate connection failure, I kept pgbouncer on and postgresql off (leading to pgbouncer can not connect to postgresql Operation errror). Retries did happen but once I turned on postgresql, connection still continued to fail.

I also tried following

def set_gevent_wait_callback():
    extensions.set_wait_callback(internal_gevent_wait_callback)


def unset_gevent_wait_callback():
    extensions.set_wait_callback(None)

def internal_gevent_wait_callback(conn, timeout=None):
    """A wait callback to retry in case of connection failures."""
    retries = 0
    while True:
        try:
            database = conn.info.dbname
            user = conn.info.user
            password = conn.info.password
            host = conn.info.host
            port = conn.info.port

            dsn = f"host='{host}' port='{port}' dbname='{database}' user='{user}' password='{password}'"
            unset_gevent_wait_callback()
            conn = psycopg2.connect(dsn=dsn)
            conn.set_client_encoding('utf8')
          
            set_gevent_wait_callback()
            break

        except Exception as e:
            log.error(traceback.format_exc())

            exponential_backoff_wait(retries, e)
            retries += 1

which creates new connection (as required in use case), but this fails with server didn't send client encoding error even though conn object was created without any issue.

I will apperciate if anyone could suggest a way to retry connecting to db again via callback.

PS: I have used decorator to retry few functions, using it at generic places allows to handle all such errors (as opposted to decorator which handles it at very specific place).

0

There are 0 best solutions below