Python aioredis "await wasn't used with future" error

152 Views Asked by At

I have some problem with async functions, here is my code:

def get_redis_connection(connection_string: str):
    hostport, *options = connection_string.split(",")
    host, _, port = hostport.partition(":")
    redis_retry = Retry(ExponentialBackoff(), 5)
    arguments = {
        "retry": redis_retry,
        "retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError],
    }
    for option in options:
        opt: str
        value: Union[int, str]
        opt, _, value = option.partition("=")
        if opt == "port":
            value = int(value)
        elif opt == "ssl":
            value = value.lower() == "true"
        elif opt == "abortConnect":
            continue
        arguments[opt] = value

    return Redis(host, int(port), **arguments)


async def get_async_redis_connection(connection_string: str):
    hostport, *options = connection_string.split(",")
    host, _, port = hostport.partition(":")
    _, _, password = options[0].partition("=")
    redis_url = f'rediss://:{password}@{host}:{port}'
    pool = aioredis.ConnectionPool.from_url(redis_url)
    return aioredis.Redis(connection_pool=pool, ssl=True)


def put_redis_data(redis_connection, hash, hash_name, payload):
    ttl = int(os.environ.get("TTL", 12))
    redis_connection.hset(hash_key, hash_name, json.dumps(payload))
    redis_connection.expire(hash_key, timedelta(hours=ttl))

class Worker():
    
    def __init__(self, azure_connection_string: str, redis_connection_string: str):
        super().__init__()
        self._redis_connection = get_redis_connection(redis_connection_string)
        self._redis_async_connection = asyncio.run(get_async_redis_connection(redis_connection_string))
        self._connection_string = azure_connection_string
        

class MyFirstWorker(Worker):

    def __init__(self, azure_connection_string: str, redis_connection_string: str):
        super().__init__(azure_connection_string, redis_connection_string)

    async def async_get_redis_data(self, key, opt):
        async with self._redis_async_connection.client() as redis:
            windows = await redis.hget(key, "windows")
            doors = await redis.hget(key, "doors")
        return windows, doors

    def _process(self, opt):
        key = str(os.getenv("KEY"))
        windows, doors = asyncio.run(self.async_get_redis_data(key, opt))
        matching_dataclass = MatchingPayload(json.loads(windows)["PayloadFields"])
        logger.info(f"Matching completed")
        put_redis_data(
            self._redis_connection, key, "match:results", matching_dataclass
        )

I have similar workers in separate threads (like MyFirstWorker, MySecondWorker, MyThirdWorker), one of them has an async redis data getter (async def async_get_redis_data(self, key, opt)) other operations with redis are performed synchronously throughout the project.

Here's the error I get:

RuntimeError: await wasn't used with future

I understand that the problem is due to the way I use aioredis, it is important for me to keep the _process method sync, but still perform the redis.hget operations async Please help me understand how to fix this error?

I thought it was because of this line:

windows, doors = asyncio.run(self.async_get_redis_data(key, opt))

I tried replacing it with

windows, doors = await self.async_get_redis_data(key, opt)

But in this case, I get another error related to the fact that the _process method is synchronous (TypeError: cannot unpack non-iterable coroutine object), but it should remain so.

Here is full traceback messages for my error:

 File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
           │    │                  └ <coroutine object FirstWorker.async_get_redis_data at 0x7f5acc1893c0>
           │    └ <function BaseEventLoop.run_until_complete at 0x7f5ae568e550>
           └ <_UnixSelectorEventLoop running=False closed=True debug=False>
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
           │      └ <method 'result' of '_asyncio.Task' objects>
           └ <Task finished name='Task-109' coro=<FirstWorker.async_get_redis_data() done, defined at /app/workers/matching_worker.py:2...

  File "/app/workers/first_worker.py", line 22, in async_get_redis_data
    windows = await redis.hget(key, "windows")
                       │     │    │    └ {'Action': 29, 'service': 0, 'ObjectType': 0, 'ObjectId': 'e8742fds-9b49-4cff-adaf-f4rg542d7da079', 'TaskId': 'c73605fb-403a-41...
                       │     │    └ 'e8742fds-9b49-4cff-adaf-f4rg542d7da079_c73605fb-403a-41cc-9c0a-a32f4c81f2f4_a9d8b4e0-fca9-e982-00f1-3c05c2cc207d'
                       │     └ <function Redis.hget at 0x7f5afs52160>
                       └ <unprintable Redis object>

  File "/usr/local/lib/python3.8/dist-packages/aioredis/client.py", line 1085, in execute_command
    return await self.parse_response(conn, command_name, **options)
                 │    │              │     │               └ {}
                 │    │              │     └ 'HGET'
                 │    │              └ SSLConnection<host=my-host.com,port=6380,db=0>
                 │    └ <function Redis.parse_response at 0x7f5ges6bd310>
                 └ <unprintable Redis object>

  File "/usr/local/lib/python3.8/dist-packages/aioredis/client.py", line 1101, in parse_response
    response = await connection.read_response()
                     │          └ <function Connection.read_response at 0x7f5acf7ab1f0>
                     └ SSLConnection<host=my-host.com,port=6380,db=0>

  File "/usr/local/lib/python3.8/dist-packages/aioredis/connection.py", line 910, in read_response
    await self.disconnect()
          │    └ <function Connection.disconnect at 0x7f5acf4e50>
          └ SSLConnection<host=my-host.com,port=6380,db=0>

  File "/usr/local/lib/python3.8/dist-packages/aioredis/connection.py", line 806, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
          │    └ <member '_writer' of 'Connection' objects>
          └ SSLConnection<host=my-host.com,port=6380,db=0>

  File "/usr/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
          │    │         │                 └ <StreamWriter transport=<asyncio.sslproto._SSLProtocolTransport object at 0x7f5bnn56772e0> reader=<StreamReader transport=<asy...
          │    │         └ <function StreamReaderProtocol._get_close_waiter at 0x7f5ae5696ca0>
          │    └ <asyncio.streams.StreamReaderProtocol object at 0x7f5acc369ac0>
          └ <StreamWriter transport=<asyncio.sslproto._SSLProtocolTransport object at 0x7f5acc3772e0> reader=<StreamReader transport=<asy...

RuntimeError: await wasn't used with future
0

There are 0 best solutions below