`ValueError: I/O operation on closed file` when trying to connect Python pipeline

3.7k Views Asked by At

I've attempted to simplify piping Python Popen processes into each other using the following code. The idea is that Process represents a process with arguments (but without a stdout or stdin), and then the pipe function connects them up.

def Process(parameters, *args, **kwargs):
    """
    Represents a process that can be piped into another
    """
    parameters = [str(p) for p in parameters]

    # Partially apply the constructor, so we can handle the piping later
    return functools.partial(subprocess.Popen, parameters, *args, **kwargs)


def pipe(commands, stdin=None):
    """
    Pipes a series of commands into each other
    :param commands: An array of commands, each of which is an instance of Process
    :param stdin: stdin to the first command
    :param kwargs: Any extra arguments to pass to subprocess.Popen
    :return:
    """
    # Keep track of previous processes
    processes = []

    # Each process's stdin is the stdout of the previous stage
    for i, cmd in enumerate(commands):
        if i == 0:
            process = cmd(stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        else:
            previous = processes[-1]
            process = cmd(stdin=previous.stdout, stdout=subprocess.PIPE)

            # Close stdout of previous command so we get SIGPIPE
            previous.stdout.close()

        processes.append(process)

    first = processes[0]
    final = processes[-1]

    if first == final:
        # If we only have one process, return its output
        return first.communicate(stdin)
    else:
        # Pipe input into first process
        first.communicate(stdin)

        # Return Final process
        return final.communicate()

However, if I run the pipe function as follows:

stdout, stderr = pipe([
    Process(['tr', 'n', '\\n']),
    Process(['rev']),
    Process(['wc', '-l']),
], text)

I get the error:

ValueError: I/O operation on closed file

Notably, this error goes away if I omit the previous.stdout.close(). But the subprocess docs strongly recommend against that, if I want SIGPIPEs to work.

What am I doing wrong?

1

There are 1 best solutions below

2
On

You should not immediately close stdout, but close them in the end.

if first == final:
    # If we only have one process, return its output
    result = first.communicate(stdin)
    first.stdout.close()
    return first.communicate(stdin)
else:
    # Pipe input into first process
    first.communicate(stdin)

    # Return Final process
    result = final.communicate()
    for process in processes:
        process.stdout.close()
    return result

Because Popen return at once, your commands are not finished, so stdout is still active.