I've attempted to simplify piping Python Popen processes into each other using the following code. The idea is that Process represents a process with arguments (but without a stdout or stdin), and then the pipe
function connects them up.
def Process(parameters, *args, **kwargs):
"""
Represents a process that can be piped into another
"""
parameters = [str(p) for p in parameters]
# Partially apply the constructor, so we can handle the piping later
return functools.partial(subprocess.Popen, parameters, *args, **kwargs)
def pipe(commands, stdin=None):
"""
Pipes a series of commands into each other
:param commands: An array of commands, each of which is an instance of Process
:param stdin: stdin to the first command
:param kwargs: Any extra arguments to pass to subprocess.Popen
:return:
"""
# Keep track of previous processes
processes = []
# Each process's stdin is the stdout of the previous stage
for i, cmd in enumerate(commands):
if i == 0:
process = cmd(stdin=subprocess.PIPE, stdout=subprocess.PIPE)
else:
previous = processes[-1]
process = cmd(stdin=previous.stdout, stdout=subprocess.PIPE)
# Close stdout of previous command so we get SIGPIPE
previous.stdout.close()
processes.append(process)
first = processes[0]
final = processes[-1]
if first == final:
# If we only have one process, return its output
return first.communicate(stdin)
else:
# Pipe input into first process
first.communicate(stdin)
# Return Final process
return final.communicate()
However, if I run the pipe function as follows:
stdout, stderr = pipe([
Process(['tr', 'n', '\\n']),
Process(['rev']),
Process(['wc', '-l']),
], text)
I get the error:
ValueError: I/O operation on closed file
Notably, this error goes away if I omit the previous.stdout.close()
. But the subprocess
docs strongly recommend against that, if I want SIGPIPEs to work.
What am I doing wrong?
You should not immediately close stdout, but close them in the end.
Because
Popen
return at once, your commands are not finished, so stdout is still active.