Python asyncio with cloud function and global async initialization

49 Views Asked by At

I have come across this answer to use asyncio with google cloud functions

However, I would also like to do some async initialization


asyncio.run(some_async_global_init())

@functions_framework.http
def cloudFunction(request: Request):
    asyncio.run(async_task())

However I get Event loop is closed when i do that

I have tried opening a new event loop instead of asyncio.run. But I got Task attached to a different loop error. It would look like this

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_global_init())


@functions_framework.http
def cloudFunction(request: Request):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(async_task())

A workaround is to do the initialization as part of the task, but ideally I would like to take advantage of this global initialization tip as stated in the doc

0

There are 0 best solutions below