Azure function app python v2 model - unclosed aiohttp session

258 Views Asked by At

I am writing an Azure Function App using Python (v2 programming model) that calls an API multiple times to get some data and write the responses to a blob storage. Those operations happen asyncronously, and I am using aiohttp to make the API calls. This is the code of my function app:

import azure.functions as func
from azure.storage.blob.aio import BlobServiceClient
import asyncio
import aiohttp
import datetime
import logging
import requests
import json
import sys
import os
import ssl
import certifi

_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING = os.environ["KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING"]
_KV_SECRET_API_KEY = os.environ["KV_SECRET_API_KEY"]
CRON_SCHEDULE = os.environ["CRON_SCHEDULE"]
STORAGE_CONTAINER_NAME = os.environ["STORAGE_CONTAINER_NAME"]



def timestamp_utc():
    return datetime.datetime.now(datetime.timezone.utc)

def timestamp_for_logs():
    return timestamp_utc().isoformat()

def timestamp_for_blob_path():
    return timestamp_utc().strftime("%Y/%m/%d/T/%H/%M/%S_%Z")


async def makeAPIcall(api_endpoint, session, request_params):
    logging.info(f'attempting to make an API call... with request_params: {request_params}')
    try:
        async with session.get(api_endpoint, params = request_params) as response:
            response.raise_for_status()
            response_content = await response.read()
            logging.info("Response from the API call: "+str(response_content))
            return response_content
                
    except Exception as e:
        logging.error(f'Error calling API:')
        logging.error(str(sys.exc_info()))
        return None


async def writeResponseToBlob(data, container_name, blob_name, connection_string):
    logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name}')
    try:
        # Create the BlobServiceClient object using the connection string
        blob_service_client =  BlobServiceClient.from_connection_string(connection_string)

        # Get a ContainerClient object for the specified container
        container_client = blob_service_client.get_container_client(container_name)

        # Create a blob in the container with the specified name
        blob_client = container_client.get_blob_client(blob_name)

        # Write the API response to the blob
        await blob_client.upload_blob(data, overwrite=True)

    except:
        logging.error("ERROR during blob upload : "+str(sys.exc_info()))
        sys.exit(1)


async def chain(base_url, satellite_id, request_params, context, container_name, session) -> None:
    # part A of the chain
    api_endpoint = f"{base_url}/{satellite_id}"
    response_content = await makeAPIcall(api_endpoint, session, request_params=request_params)
    
    # part B of the chain
    output_blob_name = timestamp_for_blob_path()+'_SatID_'+str(satellite_id)+'_InvocID_'+context.invocation_id+'.json'
    
    await writeResponseToBlob(data=response_content, 
                              container_name=container_name, 
                              blob_name=output_blob_name, 
                              connection_string=_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING)
    
    logging.info(f"Finished entire chain for SatID {satellite_id}")


async def main(args, session):

    base_url = args["base_url"]
    satellite_ids =  args["satellite_ids"]
    request_params =  args["request_params"]
    context = args["context"]
    container_name = args["container_name"]
    
    # version A
    # await asyncio.gather(*(chain(base_url, satellite_id, request_params, context, container_name, session) for satellite_id in satellite_ids))

    # version B
    for satellite_id in satellite_ids:
        await chain(base_url, satellite_id, request_params, context, container_name, session)



#===============
# CORE FUNCTION:
#===============

app = func.FunctionApp()
@app.function_name(name="sendReqToApiWriteToBlobTimeTrigger")
@app.schedule(schedule=CRON_SCHEDULE, arg_name="mytimer", run_on_startup=False, use_monitor=True)

async def eventHandler(mytimer: func.TimerRequest, context: func.Context) -> None:
    
    logging.info('ctx_func_name:\t'+context.function_name),
    logging.info('ctx_func_dir:\t'+context.function_directory),
    logging.info('ctx_invocation_id:\t'+context.invocation_id),

    logging.info("Cron schedule UTC:\t"+CRON_SCHEDULE)

    # satellite_ids = [25544, 25544]
    satellite_ids = ["Amsterdam", "London"]
    # base_url = f"https://api.wheretheiss.at/v1/satellites"
    base_url = f"https://worldtimeapi.org/api/timezone/Europe"

    request_params = {}

    if mytimer.past_due:
        logging.info('The timer is past due!')


    args = {
        "base_url": base_url,
        "request_params": request_params,
        "satellite_ids" : satellite_ids,
        "context" : context,
        "container_name" : STORAGE_CONTAINER_NAME,
    }

    ssl_context = ssl.create_default_context(cafile=certifi.where())
    async with aiohttp.TCPConnector(ssl=ssl_context) as connector:
        async with aiohttp.ClientSession(connector=connector) as session:
            await main(args, session)
    
    # ensure session is closed
    await asyncio.sleep(1)
    await session.close()
    await asyncio.sleep(1)
    await connector.close()
    logging.info(f"Entire function execution finished")

Now the problem is that, at least when I simulate locally in VSCode, the function gives an error (or warning actually because the debugging does not stop it) that the client session and connector are unclosed:

[2023-12-12T22:41:00.524Z] Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001E9A492CCD0>
[2023-12-12T22:41:00.526Z] Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x000001E9A48D7520>, 982955.203)]']
connector: <aiohttp.connector.TCPConnector object at 0x000001E9A492C7F0>

I did follow the recommendation from aiohttp for graceful shotdown, as well as the recommendation to re-use the session, but strangely at every function execution, I make 2 requests to the API, and I seem to get an error at every request. What can be the issue, and how can I resolve the error?

EDIT 2024.01.15:

When I remove the await in this function:

async def writeResponseToBlob(data, container_name, blob_name, connection_string):
       ...
        # Write the API response to the blob
        # await blob_client.upload_blob(data, overwrite=True)
        blob_client.upload_blob(data, overwrite=True)

then the problem disappears, and instead I get a warning:

...\function_app.py: RuntimeWarning: coroutine 'BlobClient.upload_blob' was never awaited

But how to make sure that this function works with both await on api call, and on blob upload?

2

There are 2 best solutions below

0
average.everyman On BEST ANSWER

turns out the problem was with the writeResponseToBlob method and what was missing was async with for the blob service client.

So this method now looks like this:

async def writeResponseToBlob(data, connection_string, container_name, blob_name):
    logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name} ...')
    try:
        async with BlobServiceClient.from_connection_string(connection_string) as blob_service_client:
            # Get a ContainerClient object for the specified container
            container_client = blob_service_client.get_container_client(container_name)
            # Write the API response to the blob
            blob_client = await container_client.upload_blob(name=blob_name, data=data, overwrite=True)
        return blob_client.blob_name
    except:
        logging.error("ERROR during blob upload : "+str(sys.exc_info()))
        return None

and that fixed the issues with the unclosed aiohttp session/connector.

3
RithwikBojja On

I am writing an Azure Function App using Python (v2 programming model) that calls an API multiple times to get some data and write the responses to a blob storage.

You can use below code to do the same:

import azure.functions as func
import logging
import aiohttp
import asyncio
import ssl
import certifi
from azure.storage.blob import BlobServiceClient

app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
async def rith_fetch_data(url, rith_session):
    async with rith_session.get(url) as rith_response:
        return await rith_response.text()
@app.route(route="http_trigger")
async def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
        rith_base_url = "https://jsonplaceholder.typicode.com"
        endpoints = ["/posts/1", "/todos/1"]
        urls = [rith_base_url + endpoint for endpoint in endpoints]
        ssl_context = ssl.create_default_context(cafile=certifi.where())    
        connector = None
        async with aiohttp.TCPConnector(ssl=ssl_context) as connector:
            async with aiohttp.ClientSession(connector=connector) as rith_session:
            
                tasks = [rith_fetch_data(rith_url, rith_session) for rith_url in urls]
                responses = await asyncio.gather(*tasks)
 
                for rith_url, response in zip(urls, responses):
                    logging.info(f"Response from {rith_url}: {response}")
                rith_conn_string = "DefaultEndpointsProtocol=https;AccountName=rithwik;AccountKey=pPBW9jWuhgf+ASt5NLWkA==;EndpointSuffix=core.windows.net"
                rith_cont_name = "rithwik"
                rith_blob_name = "test.txt"
                rith_bsc = BlobServiceClient.from_connection_string(rith_conn_string)
                container_client = rith_bsc.get_container_client(rith_cont_name)
                rith_bc = container_client.get_blob_client(rith_blob_name)
                rith_bc.upload_blob("\n".join(responses))   
        await asyncio.sleep(1)
        await connector.close()
        logging.info("Hello Rithwik, Connector closed")

        return func.HttpResponse("Hello Rithwik Bojja, Data fetched and sent")

Output:

enter image description here

enter image description here

In Blob storage:

enter image description here

The above code worked for me without and errors or warnings.