Creating an async connection with psycopg3

3.5k Views Asked by At

I am trying to create an async connection using psycopg3. I was using psycopg2 without async and need to move to async database functions. The docs do not give much information.

So this is what I was using with psycopg2. It worked good.

con = psycopg2.connect(host="HOSTNAME", port="PORT", database=("DATABASE", user="USER", password="PASSWORD")
cursor = con.cursor()

Then when I needed to run a query I would just use

cursor.execute(query, params)
cursor.fetchall() # or con.commit() depending on insert or select statement.

Now that I am moving to async functions, I have tried this

con = await psycopg.AsyncConnection.connect(host="HOSTNAME", port="PORT", database="DATABASE", user="USER", password="PASSWORD")
cursor = await con.cursor()

But I get the error that I cannot use await outside of a function.

The docs tell me to do this

async with await psycopg.AsyncConnection.connect() as aconn:
    async with aconn.cursor() as cur:
        await cur.execute(...)

So do I need to write this in every function that I want to either read or write records with?

Couple examples in my code using psycopg2 currently

async def check_guild(guild_id):
    cursor.execute("SELECT guild_id, guild_name, su_id FROM guild WHERE guild_id = %s", [guild_id])
    guild = cursor.fetchone()
    return guild

async def config_raffle(guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee):
    try:
        cursor.execute("""INSERT INTO raffle_config (guild_id, channel_id, channel_name, channel_cat_id, token, default_token, default_address, su_id, fee) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (guild_id) DO UPDATE SET channel_id = EXCLUDED.channel_id, channel_name = EXCLUDED.channel_name, channel_cat_id = EXCLUDED.channel_cat_id, token = EXCLUDED.token,
        default_token = EXCLUDED.default_token, default_address = EXCLUDED.default_address, su_id = EXCLUDED.su_id, fee = EXCLUDED.fee""",
                       (guild_id, channel_id, channel_name, channel_cat_id, token, token_id, default_address, su_id, fee))
        con.commit()
    except:
        logging.exception("Exception", exc_info=True)
        con.rollback()
        print("Error: 25")
    return True

So I am thinking maybe my better option is to use the AsyncConnectionPool. I have a db.py file setup like this:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)


async def open_pool():
    await pool.open()

I open the pool when my program runs the on_ready function. I created new tables this way just fine, but when I try to retrieve records I get this error.

discord.ext.commands.errors.CommandInvokeError: Command raised an exception: AttributeError: 'AsyncConnection' object has no attribute 'fetchone'
1

There are 1 best solutions below

0
On

Ended up sorting this out this way:

import psycopg_pool
import os
import dotenv

dotenv.load_dotenv()

conninfo = f'host={os.getenv("HOSTNAME")} port={os.getenv("PORT")} dbname={os.getenv("DATABASE")} user={os.getenv("USER")} password={os.getenv("PASSWORD")}'
pool = psycopg_pool.AsyncConnectionPool(conninfo=conninfo, open=False)


async def open_pool():
    await pool.open()
    await pool.wait()
    print("Connection Pool Opened")

async def select_fetchall(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            results = await cursor.fetchall()
            return results


async def write(query, args):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute(query, args)
            if 'RETURNING' in query:
                results = await cursor.fetchone()
                return results
            else:
                return

Then I just call the functions when I need to read or write to the database and pass the query and args.