How to use aiopg together with aiohttp

797 Views Asked by At

I have an application that loops through batches of URL:s from a Postgres table, downloads the URL, runs a processing function on each download and saves back the result of the processing to the table.

I have written it using aiopg and aiohttp to get it to run asynchronously. In simplified form it looks like:

import asyncio
import aiopg
from aiohttp import ClientSession, TCPConnector

BATCH_SIZE = 100
dsn = "dbname=events user={} password={} host={}".format(DB_USER, DB_PASSWORD, DB_HOST)    

async def run():
    async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
        async with aiopg.create_pool(dsn) as pool:
            while True:
                count = await run_batch(session, pool)
                if count == 0:
                    break

async def run_batch(session, db_pool):
    tasks = []
    async for url in get_batch(db_pool):
        task = asyncio.ensure_future(process_url(url, session, db_pool))
        tasks.append(task)
    await asyncio.gather(*tasks)

async def get_batch(db_pool):
    sql = "SELECT id, url FROM db.urls ... LIMIT %s"
    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(sql, (BATCH_SIZE,))
            for row in cur:
                yield row

async def process_url(url, session, db_pool):
    async with session.get(url, timeout=15) as response:
        body = await response.read()
        data = process_body(body)
        await save_data(db_pool, data)

async def process_body(body):
    ...
    return data

async def save_data(db_pool, data):
    sql = "UPDATE db.urls ..."
    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute(sql, (data,))

But something is wrong. The script runs slower and slower the longer it goes, also with more and more exceptions thrown from call to session.get. My guess is that there is something wrong with how I use the Postgres connection but I cannot figure out what's wrong. Any help would be much appreciated!

0

There are 0 best solutions below