I am getting into async operations in Python and would like to use it properly, yet I found a few ways to retrieve data from multiple coroutines and am not sure which one to use.
The idea is as follows: for multiple records in a form of input-output objects, get the input, asynchronously run the API queries, gather results and update initial objects with them. Also, could you please suggest me on how to limit the tasks run concurrently to predefined value? I am afraid of spamming the server with thousands of requests at once and getting flagged because of that.
Currently I use a dict with object:asyncio.create_task pairs, wait for completion of all the tasks, then for each pair get the result and update the dicts key object.
Something along the following lines:
class myobj:
text:str=''
result:str=''
def __init__(self, text:str) -> None:
self.text=text
items:list[myobj]=[myobj('lorem'), myobj('ipsum')]
tasks={}
for item in items:
tasks[item] = asyncio.create_task(api_query(item.text))
await asyncio.wait(tasks.values())
for item,query in tasks.items():
item.result = query.result()
It feels awkward though, as if I was doing something wrong, although it seems to work properly.
I also tried multiple variants of gathering results of multiple tasks and wondered which one (if not all) is correct. Below is my code:
import asyncio
import time
async def reverse(text:str) -> str:
await asyncio.sleep(len(text)/10)
return text[::-1]
async def main() -> None:
names = [
"Rowan Adams",
"Leo Ferrell",
"Lottie Mccullough",
"Lina Riggs",
"Hope Garrett",
"Humza Galvan",
"Felix Carr",
"Sam English",
"Trinity Moyer",
"Lennon Osborn",
"Nicolas Chapman",
"Bushra Bowers",
"Izabella Banks",
"Malik Fitzgerald",
"Maisha Donovan",
"Kathleen Patel",
"Kiara Odling",
"Abby Larsen",
"Kye Joyce",
"Jimmy Davidson"
]
t1_s = time.perf_counter()
results1 = await asyncio.gather(*[asyncio.create_task(reverse(name)) for name in names])
t1_e = time.perf_counter()
print(results1)
print(t1_e-t1_s,'\n')
t2_s = time.perf_counter()
results2 = await asyncio.gather(*[reverse(name) for name in names])
t2_e = time.perf_counter()
print(results2)
print(t2_e-t2_s,'\n')
t3_s = time.perf_counter()
tasks1 = [asyncio.create_task(reverse(name)) for name in names]
await asyncio.wait(tasks1)
t3_e = time.perf_counter()
print([task.result() if task.done() is True else 'waiting' for task in tasks1])
print(t3_e-t3_s,'\n')
t4_s = time.perf_counter()
tasks2 = [reverse(name) for name in names]
results4 = await asyncio.gather(*tasks2)
t4_e = time.perf_counter()
print(results4)
print(t4_e-t4_s,'\n')
t5_s = time.perf_counter()
tasks3 = [asyncio.create_task(reverse(name)) for name in names]
results5 = await asyncio.gather(*tasks3)
t5_e = time.perf_counter()
print([task.result() for task in tasks3])
print('-----')
print(results5)
print(t5_e-t5_s,'\n')
if __name__ == "__main__":
asyncio.run(main())
From this testing I conclude that:
- asyncio.wait returns a list of tasks with their status, so it could be used to gather results from already completed tasks
- asyncio.gather returns results no matter if it was provided with coroutine or task
- when working with tasks, you can access results by running a .result() function for each completed task (ex. 3 and 5)
- I assume holding onto variables with coroutines/tasks after completion and filling result variables is pointless, but it may be reasonable to keep tasks and access results from them, instead of creating dedicated variable to store them
I have seen solutions where results from gather were zipped with the original list, I also considered using for loop like:
results = asyncio.gather(*[api_query(item.text) for item in items])
for i in range(0, len(items)):
items[i].result = results[i]
But it seems even more cumbersome than the dict solution.