When I use the coroutine function of scrapy, there is a scene where I need to use aiomysql to store item data, but occasionally Task was destroyed but it is pending will be reported, that is, sometimes it can be quickly And run normally, but most of them will report errors. I don't know much about coroutines, so I don't know if it's a problem with the aiomysql library, a problem with the scrapy code I wrote, or something else.

The following is the sample code, This is just a rough example:

# TWISTED_REACTOR has been enabled
import aiomysql
from twisted.internet.defer import Deferred


def as_deferred(f):
    """
    transform a Twisted Deferred to an Asyncio Future
    Args:
        f: async function

    Returns:
        1).Deferred
    """
    return Deferred.fromFuture(asyncio.ensure_future(f))


class AsyncMysqlPipeline:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def open_spider(self, spider):
        return as_deferred(self._open_spider(spider))

    async def _open_spider(self, spider):
        self.pool = await aiomysql.create_pool(
            host="localhost",
            port=3306,
            user="root",
            password="pwd",
            db="db",
            loop=self.loop,
        )
    
    async def process_item(self, item, spider):
        async with self.pool.acquire() as aiomysql_conn:
            async with aiomysql_conn.cursor() as aiomysql_cursor:
                # Please ignore this "execute" line of code, it's just an example
                await aiomysql_cursor.execute(sql, tuple(new_item.values()) * 2)
                await aiomysql_conn.commit()
        return item

    async def _close_spider(self):
        await self.pool.wait_closed()

    def close_spider(self, spider):
        self.pool.close()
        return as_deferred(self._close_spider())

As far as I know from other similar problems I searched, asyncio.create_task has the problem of being automatically recycled by the garbage collection mechanism, and then randomly causing task was destroyed but it is pending exceptions. The following are the corresponding reference links:

  1. asyncio: Use strong references for free-flying tasks · Issue #91887
  2. Incorrect Context in corotine's except and finally blocks · Issue #93740
  3. fix: prevent undone task be killed by gc by ProgramRipper · Pull Request #48

I don't know if it's because of this reason, I can't solve my problem, I don't know if anyone has encountered a similar error. I also hope that someone can give an example of using coroutines to store data in pipelines, without restricting the use of any library or method.

Attach my operating environment:

  • scrapy version: 2.8.0
  • aiomysql verison: 0.1.1
  • os: Win10 and Centos 7.5
  • python version: 3.8.5

My english is poor, hope i described my problem clearly.

I try to use the aiomysql coroutine in the scrapy pipelines to store data, expecting to run normally, but the actual situation will occasionally appear "task was destroyed but it is pending" error.

1

There are 1 best solutions below

1
On

The reason is as shown in the reference link I listed. It can be solved by ensuring the strong reference of the coroutine task.

The solution is similar to the following:

running_tasks = set()
# [...]
task = asyncio.create_task(some_function())
running_tasks.add(task)
task.add_done_callback(lambda t: running_tasks.remove(t))