Trio: multiple tasks reading from the same fd

439 Views Asked by At

I have a file descriptor, and I would like to read from it with multiple tasks. Each read() request on the fd is going to return a full, independent packet of data (as long as data is available).

My naive implementation was to have each worker run the following loop:

async def work_loop(fd):
   while True:
     await trio.hazmat.wait_readable(fd)
     buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)

Unfortunately, this does not work because trio raises ResourceBusyError if multiple tasks are blocking on the same fd. So my next iteration was to write a custom wait function:

async def work_loop(fd):
   while True:
     await my_wait_readable(fd)
     buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)

where

read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
    if name is None:
        name = trio.hazmat.current_task().name
    while True:
        try:
            log.debug('%s: Waiting for fd to become readable...', name)
            await trio.hazmat.wait_readable(fd)
        except trio.ResourceBusyError:
            log.debug('%s: Resource busy, parking in read queue.', name)
            await read_queue.park()
            continue
        log.debug('%s: fd readable, unparking next task.', name)
        read_queue.unpark()
        break

However, in tests I get og messages like these:

2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.

In other words:

  1. All tasks enter trio.hazmat.wait_readable
  2. One tasks returns successfully and tries to unpark the next task (but there is none)
  3. The other tasks receive BusyError and park themselves
  4. Nothing happens anymore, since all workers are parked

What's the proper way to solve this problem?

1

There are 1 best solutions below

4
On BEST ANSWER

Multiple readers from the same fd don't make sense, using Trio (or not) doesn't change that basic fact. Why are you trying to do that in the first place?

If for some reason you really do need parallel multiple tasks to post-process your data, use one read task to add the data to a queue and let your processing tasks get their data from that.

Alternately, you could use a lock:

read_lock = trio.Lock()
async def work_loop(fd):
   while True:
     async with read_lock:
        await trio.hazmat.wait_readable(fd)
        buf = os.read(fd, BUFSIZE)
     if not buf:
         break
     await do_work(buf)