I want to detect on the writer (producer) end of a named pipe, when the reader (consumer) disconnects. In particular, this should work on Linux and macOS (Windows is already requires a separate method anyway, so it's out of scope here).
The general idea of using Python's select.poll() has been not least shown in answer to "Detect when reader closes named pipe (FIFO)". Following is a fleshed-out test (pype.py) that first creates a named pipe and then forks to run consumer and producer in separate processes. The consumer will connect and eventually disconnect, while the producer polls the writing end of the named pipe for the consumer disconnecting. At least that's the theory...
Linux
Running python3 pype.py on Linux works "as advertised" and expected, as the reader/consumer disconnecting is duly reported:
named pipe /tmp/pype-ewocntg9/fifo
reader: start...
writer: start...
sleeping...
reader closed
waiting for writer to terminate...
reader has disconnected [(3, 8)]
macOS
Running python3 pype.py on macOS (12.x, 13.x) instead gives this:
named pipe /var/folders/...../pype-j33ergsn/fifo
reader: start...
writer: start...
sleeping...
reader closed
waiting for writer to terminate...
ERROR: timed out, disconnect not detected
What am I doing (holding?) wrong here? Or is macOS broken? We see the same behavior on macos 12.x as well as 13.x.
pype.py
# pype.py
import os
import tempfile
import atexit
import shutil
import time
import select
def writer(fifoname):
print("writer: start...")
w = open(fifoname, 'w')
poller = select.poll()
poller.register(w, select.POLLERR | select.POLLHUP)
poll = poller.poll(4.0*1000)
if len(poll) == 0:
print("ERROR: timed out, disconnect not detected")
return
print("reader has disconnected", poll)
def reader(fifoname):
print("reader: start...")
r = open(fifoname, 'r')
print("sleeping...")
time.sleep(2.0)
r.close()
print("reader closed")
tmpdir = tempfile.mkdtemp(prefix="pype-")
fifoname = os.path.join(tmpdir, "fifo")
print("named pipe", fifoname)
os.mkfifo(fifoname, 0o600)
atexit.register(shutil.rmtree, tmpdir)
childpid = os.fork()
if childpid == 0:
writer(fifoname)
else:
atexit.unregister(shutil.rmtree)
reader(fifoname)
print("waiting for writer to terminate...")
os.waitpid(childpid, 0)
Nota bene: this is a minimalist Python-based test to show the issue, mirroring the issue first seen as part of a more complex Go module, see also named pipe (mkfifo): how to detect on macos that the reading end (consumer) has disconnected, without consumer writing to the write end.
Having an independent and more minimal test compared to the Go+Ginkgo/Gomega based unit test excludes mistakes specific to the Go runtime and/or Go test harness.
fstat and st_link
Rewriting the detection based on a suggestion by Marcin Orlowski as follows doesn't work either on macOS, as st_nlink is always 1, and never changes during the lifespan of the producer.
def writer(fifoname):
print("writer: start...")
wfd = os.open(fifoname, os.O_WRONLY)
wait = 0.250 # 250ms
t = 0
while t <= 4:
stat = os.fstat(wfd)
print("nlink", stat.st_nlink)
t += wait
time.sleep(wait)
print("ERROR: timed out, disconnect not detected")
kqueue
At least on macOS 12.7.2 using kqueue doesn't work either, despite the macOS documentation claiming that on pipes EV_EOF should be set when filtering for EVFILT_WRITE.
# https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/kqueue.2.html
# says:
#
# EVFILT_WRITE Takes a file descriptor as the identifier, and returns
# whenever it is possible to write to the descriptor. For
# sockets, pipes and fifos, data will contain the amount of
# space remaining in the write buffer. The filter will set
# EV_EOF when the reader disconnects, and for the fifo case,
# this may be cleared by use of EV_CLEAR.
#
# However, the example below never sees an EV_EOF despite the macOS
# documentation claiming so (at least on 12.7.2).
import os
import tempfile
import atexit
import shutil
import time
import select
def writer(fifoname):
print("writer: start...")
wfd = os.open(fifoname, os.O_WRONLY)
kq = select.kqueue()
change_list = [
select.kevent(
flags=select.KQ_EV_ADD|select.KQ_EV_EOF,
filter=select.KQ_FILTER_WRITE,
ident=wfd,
)
]
events = kq.control(change_list, 0, 0.0)
if len(events) != 0:
print("didn't expect the Spanish Inquisition", events)
return
wait = 0.250 # ms
t = 0
while t <= 4.0:
events = kq.control(None, 42, 0)
if len(events) != 0 and events[0].flags & select.KQ_EV_EOF != 0:
print("reader has disconnected", events)
return
print(events)
t+=wait
time.sleep(wait)
print("ERROR: timed out, disconnect not detected")
def reader(fifoname):
print("reader: start...")
rfd = os.open(fifoname, os.O_RDONLY)
print("sleeping...")
time.sleep(2.0)
os.close(rfd)
print("reader closed")
tmpdir = tempfile.mkdtemp(prefix="pype-")
fifoname = os.path.join(tmpdir, "fifo")
print("named pipe", fifoname)
os.mkfifo(fifoname, 0o600)
atexit.register(shutil.rmtree, tmpdir)
childpid = os.fork()
if childpid == 0:
writer(fifoname)
else:
atexit.unregister(shutil.rmtree)
reader(fifoname)
print("waiting for writer to terminate...")
os.waitpid(childpid, 0)