I have an application that loops through batches of URL:s from a Postgres table, downloads the URL, runs a processing function on each download and saves back the result of the processing to the table.
I have written it using aiopg and aiohttp to get it to run asynchronously. In simplified form it looks like:
import asyncio
import aiopg
from aiohttp import ClientSession, TCPConnector
BATCH_SIZE = 100
dsn = "dbname=events user={} password={} host={}".format(DB_USER, DB_PASSWORD, DB_HOST)
async def run():
async with ClientSession(connector=TCPConnector(ssl=False, limit=100)) as session:
async with aiopg.create_pool(dsn) as pool:
while True:
count = await run_batch(session, pool)
if count == 0:
break
async def run_batch(session, db_pool):
tasks = []
async for url in get_batch(db_pool):
task = asyncio.ensure_future(process_url(url, session, db_pool))
tasks.append(task)
await asyncio.gather(*tasks)
async def get_batch(db_pool):
sql = "SELECT id, url FROM db.urls ... LIMIT %s"
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, (BATCH_SIZE,))
for row in cur:
yield row
async def process_url(url, session, db_pool):
async with session.get(url, timeout=15) as response:
body = await response.read()
data = process_body(body)
await save_data(db_pool, data)
async def process_body(body):
...
return data
async def save_data(db_pool, data):
sql = "UPDATE db.urls ..."
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, (data,))
But something is wrong. The script runs slower and slower the longer it goes, also with more and more exceptions thrown from call to session.get
. My guess is that there is something wrong with how I use the Postgres connection but I cannot figure out what's wrong. Any help would be much appreciated!