How to gather task results in Trio?

4.3k Views Asked by At

I wrote a script that uses a nursery and the asks module to loop through and call an API based upon the loop variables. I get responses but don't know how to return the data like you would with asyncio.

I also have a question on limiting the APIs to 5 per second.

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)



async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)




if __name__ == "__main__":
    trio.run(main)

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

Also, I can limit the number of sessions to 1-4, which helps get down below the 5 API per second limit, but was wondering if there was a built in way to ensure that no more than 5 APIs get called in any given second?

5

There are 5 best solutions below

5
On BEST ANSWER

Returning data: pass the networkID and a dict to the fetch tasks:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

Alternately, create a trio.Queue to which you put the results; your main task can then read the results from the queue.

API limit: create a trio.Queue(10) and start a task along these lines:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

Pass that queue to fetch, as another argument, and call await limit_queue.get() before each API call.

3
On

When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.

You're asking two different questions, so I'll just answer this one. Matthias already answered your other question.

When you call start_soon(), you are asking Trio to run the task in the background, and then keep going. This is why Trio is able to run fetch() several times concurrently. But because Trio keeps going, there is no way to "return" the result the way a Python function normally would. where would it even return to?

You can use a queue to let fetch() tasks send results to another task for additional processing.

To create a queue:

response_queue = trio.Queue(capacity=len(networkIds))

When you start your fetch tasks, pass the queue as an argument and send a sentintel to the queue when you're done:

async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, url.format(i), headers, response_queue)
await response_queue.put(None)

After you download a URL, put the response into the queue:

async def fetch(url, headers, response_queue):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    # Add responses to queue
    await response_queue.put(response)
    print("Finished: ", url, len(response.content), response.status_code)

With the changes above, your fetch tasks will put responses into the queue. Now you need to read responses from the queue so you can process them. You might add a new function to do this:

async def process(response_queue):
    async for response in response_queue:
        if response is None:
            break
        # Do whatever processing you want here.

You should start this process function as a background task before you start any fetch tasks so that it will process responses as soon as they are received.

Read more in the Synchronizing and Communicating Between Tasks section of the Trio documentation.

1
On

Technically, trio.Queue has been deprecated in trio 0.9. It has been replaced by trio.open_memory_channel.

Short example:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

And in your fetch function you should call async sender.send(value) somewhere.

0
On

As @Adrien Clerc answer states: trio.Queue has been deprecated: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40

For task communication in Trio see: https://trio.readthedocs.io/en/latest/reference-core.html#using-channels-to-pass-values-between-tasks

Here's a full working minimal (removing the async url get request and replacing with a sleep) example for your use case using open_memory_channel

import datetime
import trio


async def main():
    network_ids = ["id1", "id2", "idn"]
    url = "https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600"
    send_channel, receive_channel = trio.open_memory_channel(len(network_ids))
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel, url, network_ids)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel, url, network_ids):
    async with send_channel:
        async with trio.open_nursery() as nursery:
            for i in network_ids:
                nursery.start_soon(fetch, send_channel, url.format(i))


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            # Do your job here
            print(f"value received: {value} at time {datetime.datetime.utcnow()}")


async def fetch(send_channel, url):
    print(f"Start: {datetime.datetime.utcnow()}")
    await trio.sleep(1)
    response = f"response for {url}"
    await send_channel.send(response)
    print(f"Finished: {datetime.datetime.utcnow()}")


if __name__ == "__main__":
    trio.run(main)


This prints:

Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id1/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id2/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/idn/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
3
On

Based on this answers, you can define the following function:

async def gather(*tasks):

    async def collect(index, task, results):
        task_func, *task_args = task
        results[index] = await task_func(*task_args)

    results = {}
    async with trio.open_nursery() as nursery:
        for index, task in enumerate(tasks):
            nursery.start_soon(collect, index, task, results)
    return [results[i] for i in range(len(tasks))]

You can then use trio in the exact same way as asyncio by simply patching trio (adding the gather function):

import trio
trio.gather = gather

Here is a practical example:

async def child(x):
    print(f"Child sleeping {x}")
    await trio.sleep(x)
    return 2*x

async def parent():
    tasks = [(child, t) for t in range(3)]
    return await trio.gather(*tasks)

print("results:", trio.run(parent))