I just isolated a strange behavior I've observed, but now I'm out of ideas.
My script asynchronously iterates over the stdout
stream of a process which has been spawned via ssh
(asyncssh
) in a kind of complex way. And now sometimes that stream gets truncated, i.e. I get StopAsyncIteration
when I expect more to come.
Now I found the culprit - the following lines read from the stream line by line with separate tasks (in order to be able to implement a timeout, but I removed the according code):
async def add_next(generator, collector) -> bool:
with suppress(StopAsyncIteration):
collector.append(await anext(generator))
return True
return False
async def thrtl(generator):
bucket = []
while True:
if not await asyncio.create_task(add_next(generator, bucket)):
break
yield bucket
(please note this code does not make much sense - it just reproduces my observation)
The code above will truncate generator
, because awaiting the next element takes place in separate tasks. If I only await add_next
everything works as expected:
async def thrtl(generator):
bucket = []
while True:
if not await add_next(generator, bucket):
break
yield bucket
This problem does not occur with every async iterable, e.g. streams created with create_subprocess_exec
work fine.
If I connect to a remote machine with slow internet I even get more lines, and the log shows
INFO:asyncssh:[conn=0, chan=1] Received exit status 0
INFO:asyncssh:[conn=0, chan=1] Received channel close
INFO:asyncssh:[conn=0, chan=1] Channel closed
directly before I get the StopAsyncIteration
. So it looks like reading from the stream after the process has terminated seems to be part of the problem.
I could accept this and try to read from the stream differently, but why does it work without the tasks being created?
Here is a full script for reference - please note that the effects are better observable with a remote connection (rather than connecting to 'localhost'):
import asyncio, asyncssh, logging
from contextlib import suppress
async def add_next(generator, collector) -> bool:
with suppress(StopAsyncIteration):
collector.append(await anext(generator))
return True
return False
async def thrtl(generator):
bucket = []
while True:
# good:
#if not await add_next(generator, bucket):
# bad:
if not await asyncio.create_task(add_next(generator, bucket)):
break
yield bucket
async def streamhandler(stream):
result = []
async for rawlines in thrtl(aiter(stream)):
result += rawlines
return result
async def main():
ssh_connection = await asyncssh.connect("localhost")
while True:
ssh_process = await ssh_connection.create_process("df -P")
stdout, stderr, completed = await asyncio.gather(
streamhandler(ssh_process.stdout),
streamhandler(ssh_process.stderr),
asyncio.ensure_future(ssh_process.wait()),
)
print(stdout)
print(stderr)
await asyncio.sleep(2)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
As jsbueno pointed out this behavior can be avoided by having only one task (async context) which actually reads from
generator
and use aasyncio.Queue
to access read items from different tasks/contexts. One caveat of this approach is exception handling, especiallyStopAsyncIteration
since reading fromQueue
will never let you know your generator has been eaten up already. Still hoping there is a more straight forward approach, here is how I did it:The full (more meaningful but also more verbose) function with type hints looks like this now:
Yet, I haven't read anywhere I shouldn't access an async generator from more than one context. And taking into account how exceptions have to be smuggled the way I did, I consider it a (design) bug.
hth