Asyncio as_completed in AnyIO

513 Views Asked by At

How can I implement the functionality of asyncio as_completed on anyio?

I have a messagebus that picks up a user command and directs it to the appropriate handler. That may generate a new domain event that should be picked up by the bus too. Using asyncio.as_completed I can run multiple tasks concurrently, get the results of each task as they complete, check if a new event was generated, and then handle this new event. I would like to use anyio, but don't know how.

This is kinda of what I am doing with asyncio:

import asyncio
import itertools

import anyio


async def handle(event: str):
    await handle_event(event)


async def handle_event(event: str):
    if event == "event":
        coros = [slow_2(), slow_5()]
    else:
        coros = [slow_1(), slow_1()]
    for coro in asyncio.as_completed(coros):
        result = await coro
        new_events = []
        if result == "event":
            new_events.append(["", ""])
        if new_events:
            async with anyio.create_task_group() as tg:
                for event in new_events:
                    tg.start_soon(handle_event, event)


async def spin(msg: str) -> None:
    for char in itertools.cycle(r"\|/-"):
        status = f"\r{char} {msg}"
        print(status, flush=True, end="")
        try:
            await anyio.sleep(0.1)
        except Exception:
            break
    blanks = " " * len(status)
    print(f"\r{blanks}\r", end="")


async def slow_1():
    await anyio.sleep(1)
    print("slow_1")


async def slow_2():
    await anyio.sleep(2)
    print("slow_2")
    return "event"


async def slow_5():
    await anyio.sleep(5)
    print("slow_5")


async def supervisor():
    async with anyio.create_task_group() as tg:
        with anyio.CancelScope(shield=True) as scope:
            tg.start_soon(spin, "thinking!")
            await handle("event")
            tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(supervisor)
2

There are 2 best solutions below

2
On BEST ANSWER

There are a few ways you could do this, but here's one that should have almost the same API:

from collections.abc import Awaitable, Iterable
from typing import TypeVar

import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup

T = TypeVar("T")


def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> Iterable[Awaitable[T]]:
  send_stream, receive_stream = create_memory_object_stream()

  async def populate_result(a: Awaitable[T]):
    await send_stream.send(await a)

  async def wait_for_result() -> T:
    return await receive_stream.receive()

  for a in aws:
    tg.start_soon(populate_result, a)

  return (wait_for_result() for _ in aws)

async def main():
  async with anyio.create_task_group() as tg:
    coroutines = [slow_1(), slow_2(), slow_3()]
    for coroutine in as_completed(tg, coroutines):
      result = await coroutine
      # do stuff with result

anyio.run(main)

If you don't mind changing the API slightly, we can simplify a bit more:

from collections.abc import Awaitable, Iterable, AsyncIterable
from typing import TypeVar

import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup

T = TypeVar("T")


def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> AsyncIterable[Awaitable[T]]:
  send_stream, receive_stream = create_memory_object_stream()

  async def populate_result(a: Awaitable[T]):
    await send_stream.send(await a)

  for a in aws:
    tg.start_soon(populate_result, a)

  return receive_stream

async def main():
  async with anyio.create_task_group() as tg:
    coroutines = [slow_1(), slow_2(), slow_3()]
    async for result in as_completed(tg, coroutines):
      # do stuff with result

anyio.run(main)

I chose to use a TypeVar named T everywhere, but you could consider using Any instead. This would mean you can use this with mixed coroutine types.

DISCLAIMER: I haven't actually run this code, but the approach should work just fine with minor modifications if necessary.

0
On

(Disclaimer: I am the author of the aioresult library.)

If you have a fixed list of tasks whose results you want, like the original asyncio.as_completed() function, you can now use the aioresult.results_to_channel() function:

async with anyio.create_task_group() as tg:
    results = [ResultCapture.start_soon(tg, foo, i) for i in range(10)]

    send_channel, receive_channel = anyio.create_memory_object_stream(1)
    tg.start_soon(results_to_channel, results, send_channel)

    async for r in receive_channel:
        print("task completed:", r.args[0], "result:", r.result())

There are three steps going on here:

  1. Within a task group, you start all of your tasks and arrange for their results to be captured with aioresult.ResultCapture. In this case, the code is running foo(0), foo(1), ... foo(9). (If you want to carry on even if foo() raises an exception, you can pass suppress_exception=True here.
  2. Create a memory stream, and call aioresult.results_to_channel() in the background so that results are sent to that stream.
  3. Use an async for loop over the receive end of the stream to get the results.

However, it seems like you want a dynamic list of tasks, with more added after the first few are already started and sent to the stream. This is also possible but not supported in aioresult. The code can be found in this gist: https://gist.github.com/arthur-tacca/5c717ae68ac037e72ae45fd1e9ca1345