Potential race condition within Python I/O Multiplexer

56 Views Asked by At

Problem Statement

We are using tail and subprocess.Popen to ingest log data into a custom application. This application is expected to run on a variety of *NIX systems including OpenBSD, RedHat Linux, and Debian 10 + 11.

We are spawning a new subprocess using subprocess.Popen calling the system installed tail binary to collect log data from files. Using the system installed version of tail is deemed most efficient as it uses inotify and polling to discover changes to the file.

The child process pushes output through STDOUT and an IO multiplexer created by DefaultSelector handles the communication with our python application. Per the docs, it is recommended that DefaultSelector be used when running on heterogenous *NIX systems and choses the right selector object. IO Selector Doc

Through testing we've noticed that read1 calls return IO that may contain broken lines. For example, a line containing 29834 within the file may be returned as two messages: 2983 and 4.

Question

Curious if this is a race condition within the selector module in python or on the system. Slowing our writes to the log file reduces this anomaly but does not fully eliminate it. I have created a POC for testing and further explanations. We need a solution that will fix this issue at the application or system level. It may also be accepted that this is the wrong approach for tailing a file in this manor.

Example and Setup Instructions

Setup

Two scripts that can be used to recreate the problem. The first, tailp.py will tail the file and print the output to the parent processes STDOUT. The second script write_logs.py will simple write 30K log lines to the target file ./log.log.

  1. Create a target log file within the local directory touch log.log.

  2. Run python3 tailp.py within a terminal.

  3. Within a separate terminal run python3 write_logs.py.

  4. Within the terminal session for tailp.py log lines should be emitted. There will likely be broken numbers such as:

29829
29830
29831
29832
29833
2983     <-- Should be a single line
4        <--
29835
29836
29837
29838
29839
29840
29841

Scripts

tailp.py

import selectors
import logging
import subprocess
from shlex import quote
from shutil import which
from threading import Event


def intake_gateway_push(datum):
    """
    Function used to push data into a mocked component.
    """
    print(datum)


def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):

                # Decoding file handler data into utf-8 and separating data
                # by newlines.
                data = key.fileobj.read1().decode('utf-8').split("\n")

                # Removing all empty strings produced by 'split(\n)'.
                data = [ datum for datum in data if datum != '' ]

                # Pushing each log line into mocked gateway function.
                for datum in data:
                    intake_gateway_push(datum)
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()


if __name__ == "__main__":
    stop_event = Event()
    tail("log.log", stop_event)

write_logs.py

import time

def write_logs(count: int):
    """
    Write specified quantity of logs to file.

    :param count: number of log lines to write
    """
    with open(f"./log.log", mode="w", encoding="utf-8") as file:
        for i in range(count):
            file.write(f"{i}\n")
            time.sleep(0.001)


if __name__ == "__main__":
    write_logs(30000)
1

There are 1 best solutions below

0
NetPenguin21 On

The problem appears when a complete line is not received in a single read operation. If the last character of the received data is not a newline character it means that the line is incomplete. The incomplete line gets stored for the next iteration and the next read operation appends more data to it. However the subsequent split operation may still result in incomplete lines being processed.

Example 1.

The incomplete line is stored in the incomplete_line variable. The variable is updated with each iteration to accumulate the incomplete line along with the new data received. The combined data is then split into lines and all complete lines except the last one are processed. The last line is stored as potentially incomplete for the next iteration.

def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        incomplete_line = ''
        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):
                # Read data from fileobj
                data = key.fileobj.read1().decode('utf-8')

                # Combine the incomplete line with the new data
                data = incomplete_line + data

                # Split the data into lines
                lines = data.split("\n")

                # Process all complete lines except the last one
                for line in lines[:-1]:
                    # Pushing each log line into mocked gateway function.
                    intake_gateway_push(line)

                # The last line might be incomplete, so store it for the next iteration
                incomplete_line = lines[-1]
                
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()

Example 2.

For a more efficient solution you can use the same process, but instead of collecting incomplete lines you can process the data in a streaming fashion and collect complete lines as they become available.

NOTE: This solution assumes data is terminated by \n so adjust according to your needs.

def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        incomplete_line = ''
        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):
                # Read data from fileobj
                data = key.fileobj.read1().decode('utf-8')

                # Combine the incomplete line with the new data
                data = incomplete_line + data
                
                # Process each character in the data
                start = 0
                for i in range(len(data)):
                    if data[i] == '\n':
                        # Process the complete line
                        line = data[start:i]
                        # Pushing the log line into mocked gateway function.
                        intake_gateway_push(line)
                        start = i + 1

                # The last line might be incomplete, so store it for the next iteration
                # incomplete_line = lines[-1]
                incomplete_line = data[start:]
                
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()